mndConsumer.c 39.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
21
#include "mndPrivilege.h"
L
Liu Jicong 已提交
22 23
#include "mndShow.h"
#include "mndStb.h"
24
#include "mndSubscribe.h"
L
Liu Jicong 已提交
25 26 27 28
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "tcompare.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

32
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
33 34
#define MND_CONSUMER_RESERVE_SIZE 64

L
Liu Jicong 已提交
35 36
#define MND_CONSUMER_LOST_HB_CNT          3
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
37

L
Liu Jicong 已提交
38
static int8_t mqRebInExecCnt = 0;
39

L
Liu Jicong 已提交
40 41
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
42 43 44
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
45 46
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
47
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
48

S
Shengliang Guan 已提交
49 50
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
51
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
52 53
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
54
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
55
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
56

L
Liu Jicong 已提交
57
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
58 59 60 61 62 63 64 65 66
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68 69 70
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
71
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
72 73
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
74
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
75 76 77 78

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
79 80 81 82 83
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
84
bool mndRebTryStart() {
L
Liu Jicong 已提交
85
  int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
L
Liu Jicong 已提交
86 87 88
  return old == 0;
}

L
Liu Jicong 已提交
89
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
90

L
Liu Jicong 已提交
91
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
92

L
Liu Jicong 已提交
93
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
94

S
Shengliang Guan 已提交
95 96 97
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
98
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
99 100 101
  if (pConsumer == NULL) {
    return 0;
  }
102

H
Haojun Liao 已提交
103 104
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
105

D
dapan1121 已提交
106 107 108 109 110
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

111 112 113 114 115
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

116
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
117 118 119 120 121 122 123 124 125 126 127
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
128

L
Liu Jicong 已提交
129
  tDeleteSMqConsumerObj(pConsumerNew);
130 131
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
132 133
  return 0;
FAIL:
L
Liu Jicong 已提交
134
  tDeleteSMqConsumerObj(pConsumerNew);
135
  taosMemoryFree(pConsumerNew);
136 137 138 139
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
140 141 142
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
143
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
144
  ASSERT(pConsumer);
L
Liu Jicong 已提交
145

H
Haojun Liao 已提交
146 147
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
148

L
Liu Jicong 已提交
149
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
150
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
151
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
152 153 154
    return -1;
  }

L
Liu Jicong 已提交
155 156 157 158 159
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

160
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
161 162 163 164
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
165 166 167
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
168 169
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
170 171 172 173
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
174
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
175 176 177 178
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
216
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
217 218 219 220
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
221 222 223
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
224 225 226
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
227
  }
L
Liu Jicong 已提交
228
  return pRebInfo;
229 230
}

S
Shengliang Guan 已提交
231 232
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
233 234 235 236
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

237 238
  mTrace("start to process mq timer");

239
  // rebalance cannot be parallel
L
Liu Jicong 已提交
240
  if (!mndRebTryStart()) {
241 242 243 244 245 246 247 248 249 250 251
    mInfo("mq rebalance already in progress, do nothing");
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  // TODO set cleanfp

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
252 253 254
    if (pIter == NULL) {
      break;
    }
255 256 257

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
258 259 260

    mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64,
        pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
L
Liu Jicong 已提交
261 262

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
263 264 265 266 267 268 269 270 271 272 273
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
            .pCont = pLostMsg,
            .contLen = sizeof(SMqConsumerLostMsg),
        };
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
274
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
275
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284 285 286
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
            .pCont = pClearMsg,
            .contLen = sizeof(SMqConsumerClearMsg),
        };
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
287
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
288
      taosRLockLatch(&pConsumer->lock);
289 290 291 292 293
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
294
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
295 296
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
297
      taosRUnLockLatch(&pConsumer->lock);
298
    } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
L
Liu Jicong 已提交
299
      taosRLockLatch(&pConsumer->lock);
300 301 302 303 304
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
305
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
306 307 308 309 310 311 312 313
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
314
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
315 316
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
317
      taosRUnLockLatch(&pConsumer->lock);
318 319 320 321 322 323 324 325 326 327
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
328
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
329 330 331 332 333 334 335
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
L
Liu Jicong 已提交
336
    mTrace("mq rebalance finished, no modification");
L
Liu Jicong 已提交
337
    mndRebEnd();
338 339 340 341
  }
  return 0;
}

342
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
343 344
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
345

D
dapan1121 已提交
346 347 348 349 350
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
351
  int64_t         consumerId = req.consumerId;
352
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
353
  if (pConsumer == NULL) {
354
    mError("consumer:0x%"PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
355 356 357
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
358 359 360 361 362 363

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
364
    mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
365 366 367
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
368
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
369
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
370 371 372 373
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
374 375 376 377 378 379 380
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
381
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
382 383 384
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
385 386 387 388 389 390

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
391 392
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
393

394
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
395
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
396
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
397 398 399 400
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
401 402 403 404 405 406 407
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
408

409 410 411 412
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
413

L
Liu Jicong 已提交
414
#if 1
L
Liu Jicong 已提交
415
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
416
    mInfo("try to recover consumer:0x%"PRIx64, consumerId);
L
Liu Jicong 已提交
417 418 419
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
420
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
421
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
422 423 424
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
425

426
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
427
  }
428
#endif
429 430

  if (status != MQ_CONSUMER_STATUS__READY) {
431
    mInfo("consumer:0x%"PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
432 433 434 435 436 437
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

438
  // 2. check epoch, only send ep info when epochs do not match
439 440
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
H
Haojun Liao 已提交
441
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, serverEpoch);
442 443 444 445 446
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
447
      taosRUnLockLatch(&pConsumer->lock);
448 449 450
      goto FAIL;
    }

H
Haojun Liao 已提交
451
    // handle all topics subscribed by this consumer
452 453 454 455 456
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);

      // txn guarantees pSub is created
457
      ASSERT(pSub);
458 459 460 461 462 463 464
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
465
      ASSERT(pTopic);
466
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
467
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
468
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
469 470 471 472
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
473 474 475 476
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
477 478
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
479

H
Haojun Liao 已提交
480
      // this customer assigned vgroups
481 482 483
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
484
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
485 486
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
487 488 489 490
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
491
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
510

511
  // encode rsp
L
Liu Jicong 已提交
512
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
513 514
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
515
    terrno = TSDB_CODE_OUT_OF_MEMORY;
516
    return -1;
L
Liu Jicong 已提交
517
  }
H
Haojun Liao 已提交
518

519 520 521
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
522

523
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
524
  tEncodeSMqAskEpRsp(&abuf, &rsp);
525 526

  // release consumer and free memory
L
Liu Jicong 已提交
527
  tDeleteSMqAskEpRsp(&rsp);
528 529 530
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
531 532
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
533
  return 0;
H
Haojun Liao 已提交
534

535
FAIL:
L
Liu Jicong 已提交
536
  tDeleteSMqAskEpRsp(&rsp);
537 538 539 540
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
541 542 543 544 545 546 547 548
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

549 550 551 552 553 554 555 556
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

557 558 559 560
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

561 562
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
563 564

  uint64_t        consumerId = subscribe.consumerId;
565 566 567 568 569 570
  char           *cgroup = subscribe.cgroup;
  SMqConsumerObj *pConsumerOld = NULL;
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
  SArray *newSub = subscribe.topicNames;
H
Haojun Liao 已提交
571
  taosArraySort(newSub, taosArrayCompareString);
572
  taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
573 574

  int32_t newTopicNum = taosArrayGetSize(newSub);
575

H
Haojun Liao 已提交
576
  // check topic existence
577
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
578 579 580
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
581

582 583 584
  for (int32_t i = 0; i < newTopicNum; i++) {
    char        *topic = taosArrayGetP(newSub, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
585 586
    if (pTopic == NULL) {  // terrno has been set by callee function
      goto _over;
587
    }
L
Liu Jicong 已提交
588

L
Liu Jicong 已提交
589 590
    if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
591
      goto _over;
592 593
    }

594 595 596 597 598
    mndReleaseTopic(pMnode, pTopic);
  }

  pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumerOld == NULL) {
S
Shengliang Guan 已提交
599
    mInfo("receive subscribe request from new consumer:%" PRId64, consumerId);
L
Liu Jicong 已提交
600

601
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
L
Liu Jicong 已提交
602
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
603
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
L
Liu Jicong 已提交
604
    taosArrayDestroy(pConsumerNew->rebNewTopics);
605 606 607
    pConsumerNew->rebNewTopics = newSub;
    subscribe.topicNames = NULL;

L
Liu Jicong 已提交
608 609 610 611 612
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

613 614
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
615 616

  } else {
L
fix  
Liu Jicong 已提交
617
    /*taosRLockLatch(&pConsumerOld->lock);*/
L
Liu Jicong 已提交
618

619
    int32_t status = atomic_load_32(&pConsumerOld->status);
L
Liu Jicong 已提交
620

L
Liu Jicong 已提交
621 622
    mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
          consumerId, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
623

624 625
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
626
      goto _over;
627 628 629 630 631
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
632
      goto _over;
633 634 635
    }
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;

L
Liu Jicong 已提交
636 637 638 639 640
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
    int32_t oldTopicNum = 0;
    if (pConsumerOld->currentTopics) {
      oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics);
    }

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
        char *newTopicCopy = strdup(taosArrayGetP(newSub, j));
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
        char *oldTopicCopy = strdup(taosArrayGetP(pConsumerOld->currentTopics, i));
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
        char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
        char *newTopic = taosArrayGetP(newSub, j);
        int   comp = compareLenPrefixedStr(oldTopic, newTopic);
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
          char *oldTopicCopy = strdup(oldTopic);
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
          char *newTopicCopy = strdup(newTopic);
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

680 681 682 683 684
    if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
        taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
      /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
      /*pConsumerNew->updateType = */
      /*}*/
685
      goto _over;
686 687
    }

688 689
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
690 691
  }

S
Shengliang Guan 已提交
692
  code = TSDB_CODE_ACTION_IN_PROGRESS;
693

694
_over:
L
Liu Jicong 已提交
695 696
  mndTransDrop(pTrans);

697
  if (pConsumerOld) {
L
fix  
Liu Jicong 已提交
698
    /*taosRUnLockLatch(&pConsumerOld->lock);*/
699 700 701 702
    mndReleaseConsumer(pMnode, pConsumerOld);
  }
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
703
    taosMemoryFree(pConsumerNew);
704 705 706 707
  }
  // TODO: replace with destroy subscribe msg
  if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
  return code;
L
Liu Jicong 已提交
708 709
}

L
Liu Jicong 已提交
710 711
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
712 713

  void   *buf = NULL;
L
Liu Jicong 已提交
714
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
715 716 717
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
718
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
719

wafwerar's avatar
wafwerar 已提交
720
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
721 722
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
723
  void *abuf = buf;
L
Liu Jicong 已提交
724 725
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
726
  int32_t dataPos = 0;
L
Liu Jicong 已提交
727 728
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
729 730
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
731

L
Liu Jicong 已提交
732 733
  terrno = TSDB_CODE_SUCCESS;

734
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
735
  taosMemoryFreeClear(buf);
736
  if (terrno != 0) {
737
    mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
738 739 740 741
    sdbFreeRaw(pRaw);
    return NULL;
  }

742
  mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
743 744 745
  return pRaw;
}

L
Liu Jicong 已提交
746
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
747 748 749
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
750

L
Liu Jicong 已提交
751
  int8_t sver = 0;
H
Haojun Liao 已提交
752 753 754
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
755 756 757

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
758
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
759 760
  }

761
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
762 763 764
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
765

766
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
767 768 769
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
770 771

  int32_t dataPos = 0;
L
Liu Jicong 已提交
772
  int32_t len;
L
Liu Jicong 已提交
773
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
774
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
775 776 777 778 779
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
780 781
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
782

L
Liu Jicong 已提交
783
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
784
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
785 786
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
787

H
Haojun Liao 已提交
788
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
789

L
Liu Jicong 已提交
790
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
791
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
792
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
793 794
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
795
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
796
  }
L
Liu Jicong 已提交
797 798 799 800

  return pRow;
}

L
Liu Jicong 已提交
801
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
802 803 804
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
805
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
806 807 808
  return 0;
}

L
Liu Jicong 已提交
809
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
810
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, mndConsumerStatusName(pConsumer->status));
811
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
812 813 814
  return 0;
}

L
Liu Jicong 已提交
815
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
H
Haojun Liao 已提交
816 817
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%"PRId64", uptime:%"PRId64,
      pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
818

819 820 821
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
822 823
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
824

825 826 827 828 829 830 831 832 833 834
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
835

836 837 838
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
839

840
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
L
Liu Jicong 已提交
841

842 843
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
844
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
845 846
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
L
Liu Jicong 已提交
847

848
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
849
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
850 851
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
852
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
853
    }
L
Liu Jicong 已提交
854 855 856

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

H
Haojun Liao 已提交
857
    int32_t status = pOldConsumer->status;
858
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
859 860 861
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
862
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
863 864
    ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
L
Liu Jicong 已提交
865 866 867 868 869 870 871 872 873 874

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
875 876
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
877 878 879

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

880
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
881 882
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
883 884 885

    char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
    // not exist in current topic
L
Liu Jicong 已提交
886
    bool existing = false;
887
#if 1
H
Haojun Liao 已提交
888 889
    int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < numOfExistedTopics; i++) {
890
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
L
Liu Jicong 已提交
891 892 893
      if (strcmp(topic, addedTopic) == 0) {
        existing = true;
      }
894 895 896 897 898 899 900 901 902 903 904 905 906 907
    }
#endif

    // remove from new topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      if (strcmp(addedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->rebNewTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // add to current topic
L
Liu Jicong 已提交
908 909 910 911
    if (!existing) {
      taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
912

913
    // set status
H
Haojun Liao 已提交
914
    int32_t status = pOldConsumer->status;
915
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
916
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
917
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
918
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
919 920 921
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
922
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
923
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
924
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
925 926 927
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
928

H
Haojun Liao 已提交
929
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
930 931
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

932
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
933 934 935
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
936
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
937 938
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
939 940 941 942 943 944
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // not exist in new topic
#if 1
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
945
      ASSERT(strcmp(topic, removedTopic) != 0);
946 947 948 949 950
    }
#endif

    // remove from removed topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
L
fix  
Liu Jicong 已提交
951
      char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
952
      if (strcmp(removedTopic, topic) == 0) {
L
fix  
Liu Jicong 已提交
953
        taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
        taosMemoryFree(topic);
        break;
      }
    }

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }
    // must find the topic
971
    ASSERT(i < sz);
972 973

    // set status
H
Haojun Liao 已提交
974
    int32_t status = pOldConsumer->status;
975
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
976
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
977
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
978
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
979 980 981
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
982
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
983
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
984
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
985 986 987
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
988 989

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
990
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
991 992 993 994

    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
995 996 997
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
998 999 1000
  return 0;
}

L
Liu Jicong 已提交
1001
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1002 1003
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1004
  if (pConsumer == NULL) {
1005
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1006 1007 1008 1009
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1010
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1011 1012 1013
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1014

S
Shengliang Guan 已提交
1015 1016
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1017 1018 1019 1020 1021 1022
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1023 1024 1025 1026
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1027
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
1028
      mDebug("showing consumer:0x%"PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1029 1030 1031
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1032 1033 1034

    taosRLockLatch(&pConsumer->lock);

1035
    mDebug("showing consumer:0x%"PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1036

L
Liu Jicong 已提交
1037 1038 1039 1040 1041 1042 1043
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1044 1045 1046 1047
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1048 1049 1050 1051 1052 1053
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1054
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1055

L
Liu Jicong 已提交
1056 1057 1058 1059
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN);
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
L
Liu Jicong 已提交
1060
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1061
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1062

1063
      // client id
L
Liu Jicong 已提交
1064 1065
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(clientId), pConsumer->clientId, 256);
L
Liu Jicong 已提交
1066
      varDataSetLen(clientId, strlen(varDataVal(clientId)));
L
Liu Jicong 已提交
1067
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1068
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1069 1070 1071 1072 1073 1074

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
      varDataSetLen(status, strlen(varDataVal(status)));
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1075
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1076 1077 1078 1079 1080 1081

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1082
        STR_TO_VARSTR(topic, topicName);
1083
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1084
      } else {
1085
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1086 1087 1088
      }

      // end point
L
Liu Jicong 已提交
1089
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1090
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1091 1092 1093

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1094
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1095 1096 1097

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1098
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1099 1100 1101

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1102
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1103 1104 1105

      numOfRows++;
    }
L
Liu Jicong 已提交
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}