mndConsumer.c 40.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

L
Liu Jicong 已提交
29 30
#define MND_CONSUMER_LOST_HB_CNT          3
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37 38
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39 40
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79 80
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
  mInfo("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

H
Haojun Liao 已提交
84
void mndRebEnd() { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance end, rebalance counter:%d", val); }
L
Liu Jicong 已提交
85

H
Haojun Liao 已提交
86
void mndRebCntInc() { int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance trans start, rebalance count:%d", val);}
L
Liu Jicong 已提交
87

H
Haojun Liao 已提交
88
void mndRebCntDec() { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance trans end, rebalance count:%d", val); }
L
Liu Jicong 已提交
89

S
Shengliang Guan 已提交
90 91 92
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
93
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
94 95 96
  if (pConsumer == NULL) {
    return 0;
  }
97

X
Xiaoyu Wang 已提交
98
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
99 100
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
101 102 103 104 105
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

106 107 108 109 110
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

111
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
112 113 114 115 116 117 118 119 120 121 122
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
123

L
Liu Jicong 已提交
124
  tDeleteSMqConsumerObj(pConsumerNew);
125 126
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
127 128
  return 0;
FAIL:
L
Liu Jicong 已提交
129
  tDeleteSMqConsumerObj(pConsumerNew);
130
  taosMemoryFree(pConsumerNew);
131 132 133 134
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
135 136 137
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
138
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
139 140 141 142
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
143

H
Haojun Liao 已提交
144 145
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
146

L
Liu Jicong 已提交
147
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
148
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
149
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
150 151 152
    return -1;
  }

L
Liu Jicong 已提交
153 154 155 156 157
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

158
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
159 160 161 162
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
163 164 165
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
166 167
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
168 169 170 171
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
172
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
173 174 175 176
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
207

L
Liu Jicong 已提交
208 209 210 211 212 213 214
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
215
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
216 217 218 219
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
220 221 222
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
223 224 225
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
226
  }
L
Liu Jicong 已提交
227
  return pRebInfo;
228 229
}

S
Shengliang Guan 已提交
230 231
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
232 233 234 235
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

236 237
  mTrace("start to process mq timer");

238
  // rebalance cannot be parallel
L
Liu Jicong 已提交
239
  if (!mndRebTryStart()) {
240 241 242 243 244 245 246 247 248 249 250
    mInfo("mq rebalance already in progress, do nothing");
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  // TODO set cleanfp

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
251 252 253
    if (pIter == NULL) {
      break;
    }
254 255 256

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
257

X
Xiaoyu Wang 已提交
258 259 260
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
261 262

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
263 264 265 266 267 268 269 270 271
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
            .pCont = pLostMsg,
            .contLen = sizeof(SMqConsumerLostMsg),
        };
272

H
Haojun Liao 已提交
273 274
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
275
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
276
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
277 278 279 280 281 282 283 284 285
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
            .pCont = pClearMsg,
            .contLen = sizeof(SMqConsumerClearMsg),
        };
286

L
Liu Jicong 已提交
287 288
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
289
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
290
      taosRLockLatch(&pConsumer->lock);
291 292 293 294 295
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
296
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
297 298
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
299
      taosRUnLockLatch(&pConsumer->lock);
300
    } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
L
Liu Jicong 已提交
301
      taosRLockLatch(&pConsumer->lock);
302 303 304 305 306
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
307
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
308 309 310 311 312 313 314 315
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
316
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
317 318
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
319
      taosRUnLockLatch(&pConsumer->lock);
320 321 322 323 324 325 326 327 328 329
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
330
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
331 332 333 334 335 336 337
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
338
    mInfo("mq rebalance finished, no modification");
L
Liu Jicong 已提交
339
    mndRebEnd();
340 341 342 343
  }
  return 0;
}

344
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
345 346
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
347

D
dapan1121 已提交
348 349 350 351 352
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
353
  int64_t         consumerId = req.consumerId;
354
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
355
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
356
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
357 358 359
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
360 361 362 363 364 365

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
366
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
367 368 369
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
370
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
371
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
372 373 374 375
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
376 377 378 379 380 381 382
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
383
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
384 385 386
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
387 388 389 390 391 392

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
393 394
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
395

396
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
397
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
398
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
399 400 401 402
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
403 404 405 406 407 408 409
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
410

411 412 413 414
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
415

L
Liu Jicong 已提交
416
#if 1
L
Liu Jicong 已提交
417
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
418
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
419 420 421
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
422
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
423
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
424 425 426
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
427

428
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
429
  }
430
#endif
431 432

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
433
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
434 435 436 437 438 439
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

440
  // 2. check epoch, only send ep info when epochs do not match
441 442
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
443 444
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
445 446 447 448 449
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
450
      taosRUnLockLatch(&pConsumer->lock);
451 452 453
      goto FAIL;
    }

H
Haojun Liao 已提交
454
    // handle all topics subscribed by this consumer
455 456 457 458
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
459

460 461 462 463 464 465 466 467
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
468
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
469
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
470 471 472 473
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
474 475 476 477
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
478 479
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
480

H
Haojun Liao 已提交
481
      // this customer assigned vgroups
482 483 484
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
485
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
486 487
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
488 489 490 491
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
492
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
511

512
  // encode rsp
L
Liu Jicong 已提交
513
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
514 515
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
516
    terrno = TSDB_CODE_OUT_OF_MEMORY;
517
    return -1;
L
Liu Jicong 已提交
518
  }
H
Haojun Liao 已提交
519

520 521 522
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
523

524
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
525
  tEncodeSMqAskEpRsp(&abuf, &rsp);
526 527

  // release consumer and free memory
L
Liu Jicong 已提交
528
  tDeleteSMqAskEpRsp(&rsp);
529 530 531
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
532 533
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
534
  return 0;
H
Haojun Liao 已提交
535

536
FAIL:
L
Liu Jicong 已提交
537
  tDeleteSMqAskEpRsp(&rsp);
538 539 540 541
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
542 543 544 545 546 547 548 549
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

550 551 552 553 554 555 556 557
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

579 580 581 582
static void* topicNameDup(void* p){
  return taosStrdup((char*) p);
}

583 584 585 586
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

587 588
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
589 590

  uint64_t        consumerId = subscribe.consumerId;
591
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
592
  SMqConsumerObj *pExistedConsumer = NULL;
593 594 595
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
596 597 598
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
  taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
599

600
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
601

H
Haojun Liao 已提交
602
  // check topic existence
603
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
604 605 606
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
607

608 609 610
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
611 612
  }

H
Haojun Liao 已提交
613 614
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
615
    mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
L
Liu Jicong 已提交
616

617
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
L
Liu Jicong 已提交
618
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
619 620

    // set the update type
621
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
622
    taosArrayDestroy(pConsumerNew->assignedTopics);
623 624 625
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
626
    taosArrayDestroy(pConsumerNew->rebNewTopics);
627
    pConsumerNew->rebNewTopics = pTopicList;
628 629
    subscribe.topicNames = NULL;

630 631
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
632 633

  } else {
H
Haojun Liao 已提交
634 635
    /*taosRLockLatch(&pExistedConsumer->lock);*/
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
636

H
Haojun Liao 已提交
637 638
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
639

640 641
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
642
      goto _over;
643 644 645 646
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
647
      goto _over;
648
    }
H
Haojun Liao 已提交
649

650
    // set the update type
651
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
652
    taosArrayDestroy(pConsumerNew->assignedTopics);
653
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
654

655
    int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0;
656 657 658 659

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
660
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
661 662 663 664
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
665
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
666 667 668 669
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
670
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
671
        char *newTopic = taosArrayGetP(pTopicList, j);
672
        int   comp = strcmp(oldTopic, newTopic);
673 674 675 676 677
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
678
          char *oldTopicCopy = taosStrdup(oldTopic);
679 680 681 682
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
683
          char *newTopicCopy = taosStrdup(newTopic);
684 685 686 687 688 689 690
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

691 692
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
693
      goto _over;
694 695
    }

696 697
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
698 699
  }

S
Shengliang Guan 已提交
700
  code = TSDB_CODE_ACTION_IN_PROGRESS;
701

702
_over:
L
Liu Jicong 已提交
703 704
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
705 706 707
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
708
  }
H
Haojun Liao 已提交
709

710 711
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
712
    taosMemoryFree(pConsumerNew);
713
  }
714

715
  // TODO: replace with destroy subscribe msg
716
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
717
  return code;
L
Liu Jicong 已提交
718 719
}

L
Liu Jicong 已提交
720 721
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
722 723

  void   *buf = NULL;
L
Liu Jicong 已提交
724
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
725 726 727
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
728
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
729

wafwerar's avatar
wafwerar 已提交
730
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
731 732
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
733
  void *abuf = buf;
L
Liu Jicong 已提交
734 735
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
736
  int32_t dataPos = 0;
L
Liu Jicong 已提交
737 738
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
739 740
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
741

L
Liu Jicong 已提交
742 743
  terrno = TSDB_CODE_SUCCESS;

744
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
745
  taosMemoryFreeClear(buf);
746
  if (terrno != 0) {
747
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
748 749 750 751
    sdbFreeRaw(pRaw);
    return NULL;
  }

752
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
753 754 755
  return pRaw;
}

L
Liu Jicong 已提交
756
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
757 758 759
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
760

L
Liu Jicong 已提交
761
  int8_t sver = 0;
H
Haojun Liao 已提交
762 763 764
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
765 766 767

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
768
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
769 770
  }

771
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
772 773 774
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
775

776
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
777 778 779
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
780 781

  int32_t dataPos = 0;
L
Liu Jicong 已提交
782
  int32_t len;
L
Liu Jicong 已提交
783
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
784
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
785 786 787 788 789
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
790 791
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
792

L
Liu Jicong 已提交
793
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
794
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
795 796
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
797

H
Haojun Liao 已提交
798
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
799

L
Liu Jicong 已提交
800
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
801
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
802
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
803 804
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
805
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
806
  }
L
Liu Jicong 已提交
807 808 809 810

  return pRow;
}

L
Liu Jicong 已提交
811
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
812 813 814
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
815
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
816 817 818
  return 0;
}

L
Liu Jicong 已提交
819
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
820 821
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId,
         pConsumer->status, mndConsumerStatusName(pConsumer->status));
822
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
823 824 825
  return 0;
}

L
Liu Jicong 已提交
826
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
827 828
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
829

830 831 832
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
L
Liu Jicong 已提交
833 834
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
835

836 837 838 839 840 841 842 843 844 845
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
846

847 848 849
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
850

851
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
L
Liu Jicong 已提交
852

853 854
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
855
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
L
Liu Jicong 已提交
856 857
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
L
Liu Jicong 已提交
858

859
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
860
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
861
    for (int32_t i = 0; i < sz; i++) {
862
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
863
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
864
    }
L
Liu Jicong 已提交
865 866 867

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

H
Haojun Liao 已提交
868
    int32_t status = pOldConsumer->status;
869
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
870 871 872
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
873
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
L
Liu Jicong 已提交
874 875
    /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
L
Liu Jicong 已提交
876 877 878

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
879
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
880 881 882 883 884 885
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
886 887
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
888 889 890

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

891
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
L
Liu Jicong 已提交
892 893
    /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
    /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
894

895
    char *addedTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
896
    // not exist in current topic
X
Xiaoyu Wang 已提交
897

L
Liu Jicong 已提交
898
    bool existing = false;
899
#if 1
H
Haojun Liao 已提交
900 901
    int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < numOfExistedTopics; i++) {
902
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
L
Liu Jicong 已提交
903 904 905
      if (strcmp(topic, addedTopic) == 0) {
        existing = true;
      }
906 907 908 909 910 911 912 913 914 915 916 917 918 919
    }
#endif

    // remove from new topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      if (strcmp(addedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->rebNewTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // add to current topic
L
Liu Jicong 已提交
920 921 922
    if (!existing) {
      taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
M
Minglei Jin 已提交
923 924
    } else {
      taosMemoryFree(addedTopic);
L
Liu Jicong 已提交
925
    }
H
Haojun Liao 已提交
926

927
    // set status
H
Haojun Liao 已提交
928
    int32_t status = pOldConsumer->status;
929
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
930
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
931
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
932
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
933 934 935
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
936
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
937
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
938
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
939 940 941
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
942

H
Haojun Liao 已提交
943
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
944 945
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

946
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
947 948 949
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
950
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
L
Liu Jicong 已提交
951 952
    /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
953 954 955
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // not exist in new topic
L
Liu Jicong 已提交
956
#if 0
957 958
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
L
Liu Jicong 已提交
959
      A(strcmp(topic, removedTopic) != 0);
960 961 962 963 964
    }
#endif

    // remove from removed topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
L
fix  
Liu Jicong 已提交
965
      char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
966
      if (strcmp(removedTopic, topic) == 0) {
L
fix  
Liu Jicong 已提交
967
        taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984
        taosMemoryFree(topic);
        break;
      }
    }

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }
    // must find the topic
L
Liu Jicong 已提交
985
    /*A(i < sz);*/
986 987

    // set status
H
Haojun Liao 已提交
988
    int32_t status = pOldConsumer->status;
989
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
H
Haojun Liao 已提交
990
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
991
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
992
      } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
993 994 995
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
H
Haojun Liao 已提交
996
      if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
997
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
H
Haojun Liao 已提交
998
      } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
999 1000 1001
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
1002 1003

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1004
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1005 1006 1007 1008

    mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
1009 1010 1011
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1012 1013 1014
  return 0;
}

L
Liu Jicong 已提交
1015
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1016 1017
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1018
  if (pConsumer == NULL) {
1019
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1020 1021 1022 1023
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1024
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1025 1026 1027
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1028

S
Shengliang Guan 已提交
1029 1030
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1031 1032 1033 1034 1035 1036
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1037 1038 1039 1040
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1041
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1042
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1043 1044 1045
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1046 1047 1048

    taosRLockLatch(&pConsumer->lock);

X
Xiaoyu Wang 已提交
1049
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1050

L
Liu Jicong 已提交
1051 1052 1053 1054 1055 1056 1057
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1058 1059 1060 1061
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1062 1063 1064 1065 1066 1067
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1068
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1069

L
Liu Jicong 已提交
1070 1071
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1072 1073
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1074
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1075
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1076

1077
      // client id
L
Liu Jicong 已提交
1078
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1079 1080
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1081
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1082
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1083 1084 1085

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
1086 1087 1088
      const char* pStatusName = mndConsumerStatusName(pConsumer->status);
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1089
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1090
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1091 1092 1093 1094 1095 1096

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1097
        STR_TO_VARSTR(topic, topicName);
1098
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1099
      } else {
1100
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1101 1102 1103
      }

      // end point
L
Liu Jicong 已提交
1104
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1105
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1106 1107 1108

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1109
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1110 1111 1112

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1113
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1114 1115 1116

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1117
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1118 1119 1120

      numOfRows++;
    }
1121

L
Liu Jicong 已提交
1122 1123
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1124 1125

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}