mndConsumer.c 39.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

L
Liu Jicong 已提交
29 30
#define MND_CONSUMER_LOST_HB_CNT          3
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

L
Liu Jicong 已提交
32
static int8_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37 38
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39 40
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
L
Liu Jicong 已提交
79
  int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
L
Liu Jicong 已提交
80 81 82
  return old == 0;
}

L
Liu Jicong 已提交
83
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
88

S
Shengliang Guan 已提交
89 90 91
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
92
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
93 94 95
  if (pConsumer == NULL) {
    return 0;
  }
96

X
Xiaoyu Wang 已提交
97
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
98 99
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
100 101 102 103 104
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

105 106 107 108 109
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

110
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
111 112 113 114 115 116 117 118 119 120 121
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
122

L
Liu Jicong 已提交
123
  tDeleteSMqConsumerObj(pConsumerNew);
124 125
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
126 127
  return 0;
FAIL:
L
Liu Jicong 已提交
128
  tDeleteSMqConsumerObj(pConsumerNew);
129
  taosMemoryFree(pConsumerNew);
130 131 132 133
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
134 135 136
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
137
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
138 139 140 141
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
142

H
Haojun Liao 已提交
143 144
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
147
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
148
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
149 150 151
    return -1;
  }

L
Liu Jicong 已提交
152 153 154 155 156
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

157
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
158 159 160 161
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
162 163 164
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
165 166
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
167 168 169 170
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
171
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
172 173 174 175
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
206

L
Liu Jicong 已提交
207 208 209 210 211 212 213
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
214
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
215 216 217 218
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
219 220 221
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
222 223 224
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
225
  }
L
Liu Jicong 已提交
226
  return pRebInfo;
227 228
}

S
Shengliang Guan 已提交
229 230
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
231 232 233 234
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

235 236
  mTrace("start to process mq timer");

237
  // rebalance cannot be parallel
L
Liu Jicong 已提交
238
  if (!mndRebTryStart()) {
239 240 241 242 243 244 245 246 247 248 249
    mInfo("mq rebalance already in progress, do nothing");
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  // TODO set cleanfp

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
250 251 252
    if (pIter == NULL) {
      break;
    }
253 254 255

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
256

X
Xiaoyu Wang 已提交
257 258 259
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
260 261

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
262 263 264 265 266 267 268 269 270
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
            .pCont = pLostMsg,
            .contLen = sizeof(SMqConsumerLostMsg),
        };
271

H
Haojun Liao 已提交
272 273
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
274
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
275
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
            .pCont = pClearMsg,
            .contLen = sizeof(SMqConsumerClearMsg),
        };
285

L
Liu Jicong 已提交
286 287
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
288
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
289
      taosRLockLatch(&pConsumer->lock);
290 291 292 293 294
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
295
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
296 297
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
298
      taosRUnLockLatch(&pConsumer->lock);
299
    } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
L
Liu Jicong 已提交
300
      taosRLockLatch(&pConsumer->lock);
301 302 303 304 305
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
306
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
307 308 309 310 311 312 313 314
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
315
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
316 317
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
318
      taosRUnLockLatch(&pConsumer->lock);
319 320 321 322 323 324 325 326 327 328
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
329
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
330 331 332 333 334 335 336
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
L
Liu Jicong 已提交
337
    mTrace("mq rebalance finished, no modification");
L
Liu Jicong 已提交
338
    mndRebEnd();
339 340 341 342
  }
  return 0;
}

343
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
344 345
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
346

D
dapan1121 已提交
347 348 349 350 351
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
352
  int64_t         consumerId = req.consumerId;
353
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
354
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
355
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
356 357 358
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
359 360 361 362 363 364

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
365
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
366 367 368
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
369
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
370
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
371 372 373 374
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
375 376 377 378 379 380 381
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
382
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
383 384 385
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
386 387 388 389 390 391

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
392 393
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
394

395
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
396
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
397
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
398 399 400 401
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
402 403 404 405 406 407 408
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
409

410 411 412 413
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
414

L
Liu Jicong 已提交
415
#if 1
L
Liu Jicong 已提交
416
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
417
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
418 419 420
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
421
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
422
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
423 424 425
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
426

427
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
428
  }
429
#endif
430 431

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
432
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
433 434 435 436 437 438
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

439
  // 2. check epoch, only send ep info when epochs do not match
440 441
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
442 443
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
444 445 446 447 448
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
449
      taosRUnLockLatch(&pConsumer->lock);
450 451 452
      goto FAIL;
    }

H
Haojun Liao 已提交
453
    // handle all topics subscribed by this consumer
454 455 456 457
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
458

459 460 461 462 463 464 465 466
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
467
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
468
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
469 470 471 472
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
473 474 475 476
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
477 478
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
479

H
Haojun Liao 已提交
480
      // this customer assigned vgroups
481 482 483
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
484
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
485 486
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
487 488 489 490
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
491
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
510

511
  // encode rsp
L
Liu Jicong 已提交
512
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
513 514
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
515
    terrno = TSDB_CODE_OUT_OF_MEMORY;
516
    return -1;
L
Liu Jicong 已提交
517
  }
H
Haojun Liao 已提交
518

519 520 521
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
522

523
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
524
  tEncodeSMqAskEpRsp(&abuf, &rsp);
525 526

  // release consumer and free memory
L
Liu Jicong 已提交
527
  tDeleteSMqAskEpRsp(&rsp);
528 529 530
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
531 532
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
533
  return 0;
H
Haojun Liao 已提交
534

535
FAIL:
L
Liu Jicong 已提交
536
  tDeleteSMqAskEpRsp(&rsp);
537 538 539 540
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
541 542 543 544 545 546 547 548
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

549 550 551 552 553 554 555 556
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

578 579 580 581
static void* topicNameDup(void* p){
  return taosStrdup((char*) p);
}

582 583 584 585
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

586 587
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
588 589

  uint64_t        consumerId = subscribe.consumerId;
590
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
591
  SMqConsumerObj *pExistedConsumer = NULL;
592 593 594
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
595 596 597
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
  taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
598

599
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
600

H
Haojun Liao 已提交
601
  // check topic existence
602
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
603 604 605
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
606

607 608 609
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
610 611
  }

H
Haojun Liao 已提交
612 613
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
614
    mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
L
Liu Jicong 已提交
615

616
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
L
Liu Jicong 已提交
617
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
618 619

    // set the update type
620
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
621
    taosArrayDestroy(pConsumerNew->assignedTopics);
622 623 624
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
625
    taosArrayDestroy(pConsumerNew->rebNewTopics);
626
    pConsumerNew->rebNewTopics = pTopicList;
627 628
    subscribe.topicNames = NULL;

629 630
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
631 632

  } else {
H
Haojun Liao 已提交
633 634
    /*taosRLockLatch(&pExistedConsumer->lock);*/
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
635

H
Haojun Liao 已提交
636 637
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
638

639 640
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
641
      goto _over;
642 643 644 645
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
646
      goto _over;
647
    }
H
Haojun Liao 已提交
648

649
    // set the update type
650
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
651
    taosArrayDestroy(pConsumerNew->assignedTopics);
652
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
653

654
    int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0;
655 656 657 658

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
659
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
660 661 662 663
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
664
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
665 666 667 668
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
669
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
670
        char *newTopic = taosArrayGetP(pTopicList, j);
671
        int   comp = strcmp(oldTopic, newTopic);
672 673 674 675 676
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
677
          char *oldTopicCopy = taosStrdup(oldTopic);
678 679 680 681
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
682
          char *newTopicCopy = taosStrdup(newTopic);
683 684 685 686 687 688 689
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

690 691
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
692
      goto _over;
693 694
    }

695 696
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
697 698
  }

S
Shengliang Guan 已提交
699
  code = TSDB_CODE_ACTION_IN_PROGRESS;
700

701
_over:
L
Liu Jicong 已提交
702 703
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
704 705 706
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
707
  }
H
Haojun Liao 已提交
708

709 710
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
711
    taosMemoryFree(pConsumerNew);
712
  }
713

714
  // TODO: replace with destroy subscribe msg
715
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
716
  return code;
L
Liu Jicong 已提交
717 718
}

L
Liu Jicong 已提交
719 720
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
721 722

  void   *buf = NULL;
L
Liu Jicong 已提交
723
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
724 725 726
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
727
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
728

wafwerar's avatar
wafwerar 已提交
729
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
730 731
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
732
  void *abuf = buf;
L
Liu Jicong 已提交
733 734
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
735
  int32_t dataPos = 0;
L
Liu Jicong 已提交
736 737
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
738 739
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
740

L
Liu Jicong 已提交
741 742
  terrno = TSDB_CODE_SUCCESS;

743
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
744
  taosMemoryFreeClear(buf);
745
  if (terrno != 0) {
746
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
747 748 749 750
    sdbFreeRaw(pRaw);
    return NULL;
  }

751
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
752 753 754
  return pRaw;
}

L
Liu Jicong 已提交
755
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
756 757 758
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
759

L
Liu Jicong 已提交
760
  int8_t sver = 0;
H
Haojun Liao 已提交
761 762 763
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
764 765 766

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
767
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
768 769
  }

770
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
771 772 773
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
774

775
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
776 777 778
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
779 780

  int32_t dataPos = 0;
L
Liu Jicong 已提交
781
  int32_t len;
L
Liu Jicong 已提交
782
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
783
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
784 785 786 787 788
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
789 790
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
791

L
Liu Jicong 已提交
792
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
793
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
794 795
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
796

H
Haojun Liao 已提交
797
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
798

L
Liu Jicong 已提交
799
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
800
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
801
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
802 803
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
804
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
805
  }
L
Liu Jicong 已提交
806 807 808 809

  return pRow;
}

L
Liu Jicong 已提交
810
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
811 812 813
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
814
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
815 816 817
  return 0;
}

L
Liu Jicong 已提交
818
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
819 820
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId,
         pConsumer->status, mndConsumerStatusName(pConsumer->status));
821
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
822 823 824
  return 0;
}

L
Liu Jicong 已提交
825
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
826 827
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
828

829 830 831
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
L
Liu Jicong 已提交
832 833
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
834

835 836 837 838 839 840 841 842 843 844
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
845

846 847 848
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
849

850
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
L
Liu Jicong 已提交
851

852 853
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
854
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
L
Liu Jicong 已提交
855 856
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
L
Liu Jicong 已提交
857

858
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
859
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
860
    for (int32_t i = 0; i < sz; i++) {
861
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
862
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
863
    }
L
Liu Jicong 已提交
864 865 866

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

H
Haojun Liao 已提交
867
    int32_t status = pOldConsumer->status;
868
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
869 870 871
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
872
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
L
Liu Jicong 已提交
873 874
    /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
L
Liu Jicong 已提交
875 876 877

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
878
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
879 880 881 882 883 884
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
885 886
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
887 888 889

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

890
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
L
Liu Jicong 已提交
891 892
    /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
    /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
893

894
    char *addedTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
895
    // not exist in current topic
X
Xiaoyu Wang 已提交
896

L
Liu Jicong 已提交
897
    bool existing = false;
898
#if 1
H
Haojun Liao 已提交
899 900
    int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < numOfExistedTopics; i++) {
901
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
L
Liu Jicong 已提交
902 903 904
      if (strcmp(topic, addedTopic) == 0) {
        existing = true;
      }
905 906 907 908 909 910 911 912 913 914 915 916 917 918
    }
#endif

    // remove from new topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      if (strcmp(addedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->rebNewTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // add to current topic
L
Liu Jicong 已提交
919 920 921
    if (!existing) {
      taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
M
Minglei Jin 已提交
922 923
    } else {
      taosMemoryFree(addedTopic);
L
Liu Jicong 已提交
924
    }
H
Haojun Liao 已提交
925

926
    // set status
H
Haojun Liao 已提交
927
    int32_t status = pOldConsumer->status;
928
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
929
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
930
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
931
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
932 933 934
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
935
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
936
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
937
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
938 939 940
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
941

H
Haojun Liao 已提交
942
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
943 944
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

945
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
946 947 948
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
949
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
L
Liu Jicong 已提交
950 951
    /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
952 953 954
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // not exist in new topic
L
Liu Jicong 已提交
955
#if 0
956 957
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
L
Liu Jicong 已提交
958
      A(strcmp(topic, removedTopic) != 0);
959 960 961 962 963
    }
#endif

    // remove from removed topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
L
fix  
Liu Jicong 已提交
964
      char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
965
      if (strcmp(removedTopic, topic) == 0) {
L
fix  
Liu Jicong 已提交
966
        taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983
        taosMemoryFree(topic);
        break;
      }
    }

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }
    // must find the topic
L
Liu Jicong 已提交
984
    /*A(i < sz);*/
985 986

    // set status
H
Haojun Liao 已提交
987
    int32_t status = pOldConsumer->status;
988
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
989
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
990
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
991
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
992 993 994
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
995
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
996
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
997
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
998 999 1000
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
1001 1002

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1003
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1004 1005 1006 1007

    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
1008 1009 1010
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1011 1012 1013
  return 0;
}

L
Liu Jicong 已提交
1014
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1015 1016
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1017
  if (pConsumer == NULL) {
1018
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1019 1020 1021 1022
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1023
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1024 1025 1026
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1027

S
Shengliang Guan 已提交
1028 1029
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1030 1031 1032 1033 1034 1035
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1036 1037 1038 1039
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1040
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1041
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1042 1043 1044
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1045 1046 1047

    taosRLockLatch(&pConsumer->lock);

X
Xiaoyu Wang 已提交
1048
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1049

L
Liu Jicong 已提交
1050 1051 1052 1053 1054 1055 1056
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1057 1058 1059 1060
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1061 1062 1063 1064 1065 1066
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1067
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1068

L
Liu Jicong 已提交
1069 1070
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1071 1072
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1073
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1074
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1075

1076
      // client id
L
Liu Jicong 已提交
1077
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1078 1079
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1080
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1081
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1082 1083 1084

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
1085 1086 1087
      const char* pStatusName = mndConsumerStatusName(pConsumer->status);
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1088
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1089
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1090 1091 1092 1093 1094 1095

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1096
        STR_TO_VARSTR(topic, topicName);
1097
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1098
      } else {
1099
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1100 1101 1102
      }

      // end point
L
Liu Jicong 已提交
1103
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1104
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1105 1106 1107

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1108
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1109 1110 1111

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1112
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1113 1114 1115

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1116
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1117 1118 1119

      numOfRows++;
    }
1120

L
Liu Jicong 已提交
1121 1122
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1123 1124

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}