mndConsumer.c 40.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
80
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

X
Xiaoyu Wang 已提交
84
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
85

S
Shengliang Guan 已提交
86 87
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
88
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
89
}
L
Liu Jicong 已提交
90

S
Shengliang Guan 已提交
91
void mndRebCntDec() {
92 93 94 95 96 97 98 99 100 101
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
102
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
103 104 105
      break;
    }
  }
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

S
Shengliang Guan 已提交
108 109 110
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
111
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
112 113 114
  if (pConsumer == NULL) {
    return 0;
  }
115

X
Xiaoyu Wang 已提交
116
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
117 118
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
119 120 121 122 123
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

124 125 126 127 128
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

129
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
130 131 132 133 134 135 136 137 138 139 140
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
141

L
Liu Jicong 已提交
142
  tDeleteSMqConsumerObj(pConsumerNew);
143 144
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
145 146
  return 0;
FAIL:
L
Liu Jicong 已提交
147
  tDeleteSMqConsumerObj(pConsumerNew);
148
  taosMemoryFree(pConsumerNew);
149 150 151 152
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
153 154 155
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
156
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
157 158 159 160
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
161

H
Haojun Liao 已提交
162 163
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
164

L
Liu Jicong 已提交
165
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
166
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
167
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
168 169 170
    return -1;
  }

L
Liu Jicong 已提交
171 172 173 174 175
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

176
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
177 178 179 180
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
181 182 183
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
184 185
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
186 187 188 189
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
190
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
191 192 193 194
  mndTransDrop(pTrans);
  return -1;
}

195
// todo check the clear process
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
226

L
Liu Jicong 已提交
227 228 229 230 231 232 233
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
234
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
235 236 237 238
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
239 240 241
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
242 243 244
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
245
  }
L
Liu Jicong 已提交
246
  return pRebInfo;
247 248
}

X
Xiaoyu Wang 已提交
249 250
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
251 252 253 254
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
255 256
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
257 258 259 260
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
261
  mDebug("start to process mq timer");
262

263
  // rebalance cannot be parallel
L
Liu Jicong 已提交
264
  if (!mndRebTryStart()) {
D
dapan1121 已提交
265
    mDebug("mq rebalance already in progress, do nothing");
266 267 268 269
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
270
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
271
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
272 273 274 275
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

276
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
277 278 279 280 281 282 283 284
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
285 286 287 288

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
289 290 291
    if (pIter == NULL) {
      break;
    }
292 293 294

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
295

X
Xiaoyu Wang 已提交
296 297 298
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
299 300

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
301 302 303 304 305
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
306
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
307

308 309
        mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
            MND_CONSUMER_LOST_HB_CNT);
H
Haojun Liao 已提交
310 311
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
312
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
313
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
314 315 316 317 318
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
319
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
320

321 322
        mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
               MND_CONSUMER_LOST_CLEAR_THRESHOLD);
L
Liu Jicong 已提交
323 324
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
325
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
326
      taosRLockLatch(&pConsumer->lock);
327 328 329 330 331
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
332
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
333 334
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
335
      taosRUnLockLatch(&pConsumer->lock);
336
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
337
      taosRLockLatch(&pConsumer->lock);
338

339 340 341 342 343
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
344
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
345 346 347 348 349 350 351 352
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
353
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
354 355
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
356
      taosRUnLockLatch(&pConsumer->lock);
357 358 359 360 361 362 363 364
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
365
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
366 367 368 369 370 371 372
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
373
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
374
    mndRebEnd();
375 376 377 378
  }
  return 0;
}

379
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
380 381
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
382

D
dapan1121 已提交
383 384 385 386 387
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
388
  int64_t         consumerId = req.consumerId;
389
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
390
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
391
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
392 393 394
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
395 396 397 398 399 400

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
401
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
402 403 404
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
405
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
406
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
407 408 409 410
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
411 412 413 414 415 416 417
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
418
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
419 420 421
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
422 423 424 425 426 427

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
428 429
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
430

431
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
432
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
433
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
434 435 436 437
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
438 439 440 441 442 443 444
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
445

446 447 448 449
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451
#if 1
L
Liu Jicong 已提交
452
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
453
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
454 455 456
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
457
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
458
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
459 460 461
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
462

463
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
464
  }
465
#endif
466 467

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
468
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
469 470 471 472 473 474
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

475
  // 2. check epoch, only send ep info when epochs do not match
476 477
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
478 479
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
480 481 482 483 484
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
485
      taosRUnLockLatch(&pConsumer->lock);
486 487 488
      goto FAIL;
    }

H
Haojun Liao 已提交
489
    // handle all topics subscribed by this consumer
490 491 492 493
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
494

495 496 497 498 499 500 501 502
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
503
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
504
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
505 506 507 508
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
509 510 511 512
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
513 514
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
515

H
Haojun Liao 已提交
516
      // this customer assigned vgroups
517 518 519
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
520
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
521 522
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
523 524 525 526
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
527
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
546

547
  // encode rsp
L
Liu Jicong 已提交
548
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
549 550
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
551
    terrno = TSDB_CODE_OUT_OF_MEMORY;
552
    return -1;
L
Liu Jicong 已提交
553
  }
H
Haojun Liao 已提交
554

555 556 557
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
558

559
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
560
  tEncodeSMqAskEpRsp(&abuf, &rsp);
561 562

  // release consumer and free memory
L
Liu Jicong 已提交
563
  tDeleteSMqAskEpRsp(&rsp);
564 565 566
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
567 568
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
569
  return 0;
H
Haojun Liao 已提交
570

571
FAIL:
L
Liu Jicong 已提交
572
  tDeleteSMqAskEpRsp(&rsp);
573 574 575 576
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
577 578 579 580 581 582 583 584
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

585 586 587 588 589 590 591 592
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
593
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
614
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
615

X
Xiaoyu Wang 已提交
616 617
static void freeItem(void *param) {
  void *pItem = *(void **)param;
618 619 620
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
621 622
}

623 624 625 626
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

627 628
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
629 630

  uint64_t        consumerId = subscribe.consumerId;
631
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
632
  SMqConsumerObj *pExistedConsumer = NULL;
633 634 635
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
636 637
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
638
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
639

640
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
641

H
Haojun Liao 已提交
642
  // check topic existence
643
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
644 645 646
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
647

648 649 650
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
651 652
  }

H
Haojun Liao 已提交
653 654
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
655 656
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
657

658
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
659
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
660 661

    // set the update type
662
    pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
H
Haojun Liao 已提交
663
    taosArrayDestroy(pConsumerNew->assignedTopics);
664 665 666
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
667
    taosArrayDestroy(pConsumerNew->rebNewTopics);
668
    pConsumerNew->rebNewTopics = pTopicList;
669 670
    subscribe.topicNames = NULL;

671 672
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
673 674

  } else {
H
Haojun Liao 已提交
675
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
676

X
Xiaoyu Wang 已提交
677 678 679
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
680

681 682
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
683
      goto _over;
684 685 686 687
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
688
      goto _over;
689
    }
H
Haojun Liao 已提交
690

691
    // set the update type
692
    pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
H
Haojun Liao 已提交
693
    taosArrayDestroy(pConsumerNew->assignedTopics);
694
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
695

X
Xiaoyu Wang 已提交
696
    int32_t oldTopicNum = (pExistedConsumer->currentTopics) ? taosArrayGetSize(pExistedConsumer->currentTopics) : 0;
697 698 699 700

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
701
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
702 703 704 705
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
706
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
707 708 709 710
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
711
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
712
        char *newTopic = taosArrayGetP(pTopicList, j);
713
        int   comp = strcmp(oldTopic, newTopic);
714 715 716 717 718
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
719
          char *oldTopicCopy = taosStrdup(oldTopic);
720 721 722 723
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
724
          char *newTopicCopy = taosStrdup(newTopic);
725 726 727 728 729 730 731
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

732 733
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
734
      goto _over;
735 736
    }

737 738
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
739 740
  }

S
Shengliang Guan 已提交
741
  code = TSDB_CODE_ACTION_IN_PROGRESS;
742

743
_over:
L
Liu Jicong 已提交
744 745
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
746 747 748
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
749
  }
H
Haojun Liao 已提交
750

751 752
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
753
    taosMemoryFree(pConsumerNew);
754
  }
755

756
  // TODO: replace with destroy subscribe msg
757
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
758
  return code;
L
Liu Jicong 已提交
759 760
}

L
Liu Jicong 已提交
761 762
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
763 764

  void   *buf = NULL;
L
Liu Jicong 已提交
765
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
766 767 768
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
769
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
770

wafwerar's avatar
wafwerar 已提交
771
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
772 773
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
774
  void *abuf = buf;
L
Liu Jicong 已提交
775 776
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
777
  int32_t dataPos = 0;
L
Liu Jicong 已提交
778 779
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
780 781
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
782

L
Liu Jicong 已提交
783 784
  terrno = TSDB_CODE_SUCCESS;

785
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
786
  taosMemoryFreeClear(buf);
787
  if (terrno != 0) {
788
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
789 790 791 792
    sdbFreeRaw(pRaw);
    return NULL;
  }

793
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
794 795 796
  return pRaw;
}

L
Liu Jicong 已提交
797
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
798 799 800
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
801

L
Liu Jicong 已提交
802
  int8_t sver = 0;
H
Haojun Liao 已提交
803 804 805
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
806 807 808

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
809
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
810 811
  }

812
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
813 814 815
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
816

817
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
818 819 820
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
821 822

  int32_t dataPos = 0;
L
Liu Jicong 已提交
823
  int32_t len;
L
Liu Jicong 已提交
824
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
825
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
826 827 828 829 830
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
831 832
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
833

L
Liu Jicong 已提交
834
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
835
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
836 837
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
838

H
Haojun Liao 已提交
839
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
840

L
Liu Jicong 已提交
841
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
842
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
843
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
844 845
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
846
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
847
  }
L
Liu Jicong 已提交
848 849 850 851

  return pRow;
}

L
Liu Jicong 已提交
852
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
853 854 855
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
856
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
857 858 859
  return 0;
}

L
Liu Jicong 已提交
860
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
X
Xiaoyu Wang 已提交
861 862
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
         mndConsumerStatusName(pConsumer->status));
863
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
864 865 866
  return 0;
}

X
Xiaoyu Wang 已提交
867
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
868 869 870
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
871
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
872
      pConsumer->status = MQ_CONSUMER_STATUS__READY;
wmmhello's avatar
wmmhello 已提交
873
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
874
      ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0);
875 876 877 878 879 880
      pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
881
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
882
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
883
  for (int32_t i = 0; i < size; i++) {
884 885 886 887 888 889
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

      mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
890
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
891 892 893 894 895 896
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
897
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
898 899 900 901 902 903
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934

      mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
      break;
    }
  }
}

static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

      mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
935 936 937
      break;
    }
  }
938 939

  return existing;
940 941
}

L
Liu Jicong 已提交
942
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
943 944
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
945

946 947
  taosWLockLatch(&pOldConsumer->lock);

948 949 950 951
  if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) {
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
952

953
    pOldConsumer->subscribeTime = pNewConsumer->upTime;
954
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
955 956 957
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < sz; i++) {
958
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
959
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
960
    }
L
Liu Jicong 已提交
961 962 963

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

964
    int32_t prevStatus = pOldConsumer->status;
965
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
966
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
967
           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
H
Haojun Liao 已提交
968
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
969 970 971
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
972
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
973 974 975 976
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
977
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
978 979
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
980 981 982

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

983
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
984
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
985

986
    // check if exist in current topic
987
    removeFromNewTopicList(pOldConsumer, pNewTopic);
988 989

    // add to current topic
990 991 992 993
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
994
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
995 996
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
997

998
    // set status
H
Haojun Liao 已提交
999
    int32_t status = pOldConsumer->status;
1000
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1001

H
Haojun Liao 已提交
1002
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
1003 1004
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

1005
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
1006 1007
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1008
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1009 1010 1011 1012
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

1013 1014 1015 1016
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1017
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1018 1019

    // remove from current topic
1020
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1021 1022

    // set status
H
Haojun Liao 已提交
1023
    int32_t status = pOldConsumer->status;
1024
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1025 1026

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1027
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1028

1029
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1030
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1031
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1032 1033 1034
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1035 1036 1037
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1038 1039 1040
  return 0;
}

L
Liu Jicong 已提交
1041
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1042 1043
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1044
  if (pConsumer == NULL) {
1045
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1046 1047 1048 1049
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1050
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1051 1052 1053
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1054

S
Shengliang Guan 已提交
1055 1056
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1057 1058 1059 1060 1061 1062
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1063 1064 1065 1066
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1067
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1068
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1069 1070 1071
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1072 1073

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1074
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1075

L
Liu Jicong 已提交
1076 1077 1078 1079 1080 1081 1082
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1083 1084 1085 1086
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1093
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1094

L
Liu Jicong 已提交
1095 1096
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1097 1098
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1099
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1100
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1101

1102
      // client id
L
Liu Jicong 已提交
1103
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1104 1105
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1106
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1107
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1108 1109

      // status
X
Xiaoyu Wang 已提交
1110 1111
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1112 1113
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1114
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1115
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1116 1117 1118 1119 1120 1121

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1122
        STR_TO_VARSTR(topic, topicName);
1123
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1124
      } else {
1125
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1126 1127 1128
      }

      // end point
L
Liu Jicong 已提交
1129
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1130
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1131 1132 1133

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1134
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1135 1136 1137

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1138
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1139 1140 1141

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1142
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1143 1144 1145

      numOfRows++;
    }
1146

L
Liu Jicong 已提交
1147 1148
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1149 1150

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
      return "lost";
1169
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1170 1171 1172 1173 1174
      return "rebalancing";
    default:
      return "unknown";
  }
}