mndConsumer.c 41.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
wmmhello's avatar
wmmhello 已提交
66
//  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
  SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
  if (pClearMsg == NULL) {
    mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
    return;
  }

  pClearMsg->consumerId = consumerId;
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};

  mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return;
}

L
Liu Jicong 已提交
94
bool mndRebTryStart() {
H
Haojun Liao 已提交
95
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
96
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
97 98 99
  return old == 0;
}

X
Xiaoyu Wang 已提交
100
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
101

S
Shengliang Guan 已提交
102 103
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
104
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
105
}
L
Liu Jicong 已提交
106

S
Shengliang Guan 已提交
107
void mndRebCntDec() {
108 109 110 111 112 113 114 115 116 117
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
118
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
119 120 121
      break;
    }
  }
S
Shengliang Guan 已提交
122
}
L
Liu Jicong 已提交
123

wmmhello's avatar
wmmhello 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
//  SMnode             *pMnode = pMsg->info.node;
//  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
//  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
//  if (pConsumer == NULL) {
//    return 0;
//  }
//
//  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
//        mndConsumerStatusName(pConsumer->status));
//
//  if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
//
//  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
//  mndReleaseConsumer(pMnode, pConsumer);
//
//  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
//  if (pTrans == NULL) {
//    goto FAIL;
//  }
//
//  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
//    goto FAIL;
//  }
//
//  if (mndTransPrepare(pMnode, pTrans) != 0) {
//    goto FAIL;
//  }
//
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return 0;
//FAIL:
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return -1;
//}
166

S
Shengliang Guan 已提交
167 168 169
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
170
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
171 172 173 174
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
175

H
Haojun Liao 已提交
176 177
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
178

wmmhello's avatar
wmmhello 已提交
179
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
180
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
181
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
182 183 184
    return -1;
  }

L
Liu Jicong 已提交
185
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
186
  pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
L
Liu Jicong 已提交
187 188 189

  mndReleaseConsumer(pMnode, pConsumer);

190
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
191 192 193 194
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
195 196 197
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
198 199
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
200 201 202
  mndTransDrop(pTrans);
  return 0;
FAIL:
wmmhello's avatar
wmmhello 已提交
203 204
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
205 206 207 208
  mndTransDrop(pTrans);
  return -1;
}

209
// todo check the clear process
L
Liu Jicong 已提交
210 211 212
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
213 214

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
215
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
216
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
217 218 219
    return 0;
  }

H
Haojun Liao 已提交
220
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
221 222
        mndConsumerStatusName(pConsumer->status));

wmmhello's avatar
wmmhello 已提交
223
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
224 225 226 227 228
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
229
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
L
Liu Jicong 已提交
230 231 232 233 234

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
235 236

  // this is the drop action, not the update action
L
Liu Jicong 已提交
237 238 239
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
240 241
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
242 243
  mndTransDrop(pTrans);
  return 0;
244

L
Liu Jicong 已提交
245
FAIL:
wmmhello's avatar
wmmhello 已提交
246 247
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
248 249 250 251
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
252
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
253 254 255 256
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
257 258 259
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
260 261 262
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
263
  }
L
Liu Jicong 已提交
264
  return pRebInfo;
265 266
}

X
Xiaoyu Wang 已提交
267 268
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
269 270 271 272
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
273 274
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
275 276 277 278
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
279
  mDebug("start to process mq timer");
280

281
  // rebalance cannot be parallel
L
Liu Jicong 已提交
282
  if (!mndRebTryStart()) {
D
dapan1121 已提交
283
    mDebug("mq rebalance already in progress, do nothing");
284 285 286 287
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
288
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
289
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
290 291 292 293
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

294
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
295 296 297 298 299 300 301 302
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
303 304 305 306

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
307 308 309
    if (pIter == NULL) {
      break;
    }
310 311 312

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
313

wmmhello's avatar
wmmhello 已提交
314 315
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
X
Xiaoyu Wang 已提交
316
           hbStatus);
L
Liu Jicong 已提交
317

wmmhello's avatar
wmmhello 已提交
318
    if (status == MQ_CONSUMER_STATUS_READY) {
H
Haojun Liao 已提交
319
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
wmmhello's avatar
wmmhello 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
//        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
//        if (pLostMsg == NULL) {
//          mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
//              pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
//          continue;
//        }
//
//        pLostMsg->consumerId = pConsumer->consumerId;
//        SRpcMsg rpcMsg = {
//            .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
//
//        mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
//            MND_CONSUMER_LOST_HB_CNT);
//        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);

        taosRLockLatch(&pConsumer->lock);
        int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < topicNum; i++) {
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
          mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
          SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
H
Haojun Liao 已提交
343
        }
wmmhello's avatar
wmmhello 已提交
344
        taosRUnLockLatch(&pConsumer->lock);
H
Haojun Liao 已提交
345
      }
wmmhello's avatar
wmmhello 已提交
346
    } else if (status == MQ_CONSUMER_STATUS_LOST) {
H
Haojun Liao 已提交
347
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
wmmhello's avatar
wmmhello 已提交
348
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) {   // clear consumer if lost a day or unsubscribe/close
349
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
L
Liu Jicong 已提交
350
      }
351
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
352
      taosRLockLatch(&pConsumer->lock);
353

354 355 356 357 358
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
359
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
360 361 362 363 364 365 366 367
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
368
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
369 370
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
371
      taosRUnLockLatch(&pConsumer->lock);
372 373 374 375 376 377 378 379
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
380
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
381 382 383 384 385 386 387
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
388
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
389
    mndRebEnd();
390 391 392 393
  }
  return 0;
}

394
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
395 396
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
397

D
dapan1121 已提交
398 399 400 401 402
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
403
  int64_t         consumerId = req.consumerId;
404
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
405
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
406
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
407 408 409
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
410 411 412 413 414

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

wmmhello's avatar
wmmhello 已提交
415
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
416
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
417 418 419
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
420
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
421
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
422 423 424 425
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
426 427 428 429 430 431 432
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
433
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
434 435 436
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
437 438 439 440 441 442

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
443 444
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
445

446
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
447
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
448
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
449 450 451 452
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
453 454 455 456 457
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
wmmhello's avatar
wmmhello 已提交
458
    goto FAIL;
H
Haojun Liao 已提交
459
  }
460

461 462 463 464
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
465

wmmhello's avatar
wmmhello 已提交
466
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
467
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
468 469 470
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
471
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
472
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
473 474 475
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
476

477
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
478 479
  }

wmmhello's avatar
wmmhello 已提交
480
  if (status != MQ_CONSUMER_STATUS_READY) {
X
Xiaoyu Wang 已提交
481
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
482
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
wmmhello's avatar
wmmhello 已提交
483
    goto FAIL;
484 485 486 487
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

488
  // 2. check epoch, only send ep info when epochs do not match
489 490
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
491 492
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
493 494 495 496 497
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
498
      taosRUnLockLatch(&pConsumer->lock);
499 500 501
      goto FAIL;
    }

H
Haojun Liao 已提交
502
    // handle all topics subscribed by this consumer
503 504 505 506
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
507

508 509 510 511 512 513 514 515
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
516
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
517
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
518 519 520 521
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
522 523 524 525
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
526 527
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
528

H
Haojun Liao 已提交
529
      // this customer assigned vgroups
530 531 532
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
533
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
534 535
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
536 537 538 539
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
540
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
559

560
  // encode rsp
L
Liu Jicong 已提交
561
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
562 563
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
564
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
565
    goto FAIL;
L
Liu Jicong 已提交
566
  }
H
Haojun Liao 已提交
567

568 569 570 571 572 573 574 575
  SMqRspHead* pHead = buf;

  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  pHead->epoch = serverEpoch;
  pHead->consumerId = pConsumer->consumerId;
  pHead->walsver = 0;
  pHead->walever = 0;

S
Shengliang Guan 已提交
576

577
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
578
  tEncodeSMqAskEpRsp(&abuf, &rsp);
579 580

  // release consumer and free memory
L
Liu Jicong 已提交
581
  tDeleteSMqAskEpRsp(&rsp);
582 583 584
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
585 586
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
587
  return 0;
H
Haojun Liao 已提交
588

589
FAIL:
L
Liu Jicong 已提交
590
  tDeleteSMqAskEpRsp(&rsp);
591 592 593 594
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
595 596 597 598 599 600 601 602
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

603 604 605 606 607 608 609 610
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
611
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
632
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
633

X
Xiaoyu Wang 已提交
634 635
static void freeItem(void *param) {
  void *pItem = *(void **)param;
636 637 638
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
639 640
}

641 642 643 644
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

645 646
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
647

648
  int64_t        consumerId = subscribe.consumerId;
649
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
650
  SMqConsumerObj *pExistedConsumer = NULL;
651 652 653
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
654 655
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
656
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
657

658
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
659

H
Haojun Liao 已提交
660
  // check topic existence
661
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
662 663 664
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
665

666 667 668
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
669 670
  }

H
Haojun Liao 已提交
671 672
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
673 674
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
675

676
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
677
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
678

wmmhello's avatar
wmmhello 已提交
679
//  pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;   // use insert logic
H
Haojun Liao 已提交
680
    taosArrayDestroy(pConsumerNew->assignedTopics);
681 682 683
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
684
    taosArrayDestroy(pConsumerNew->rebNewTopics);
685
    pConsumerNew->rebNewTopics = pTopicList;
686 687
    subscribe.topicNames = NULL;

688 689
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
690 691

  } else {
H
Haojun Liao 已提交
692
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
693

X
Xiaoyu Wang 已提交
694 695 696
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
697

wmmhello's avatar
wmmhello 已提交
698
    if (status != MQ_CONSUMER_STATUS_READY) {
699
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
700
      goto _over;
701 702 703 704
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
705
      goto _over;
706
    }
H
Haojun Liao 已提交
707

708
    // set the update type
wmmhello's avatar
wmmhello 已提交
709
    pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
H
Haojun Liao 已提交
710
    taosArrayDestroy(pConsumerNew->assignedTopics);
711
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
712

wmmhello's avatar
wmmhello 已提交
713
    int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
714 715 716 717

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
718
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
719 720 721 722
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
723
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
724 725 726 727
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
728
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
729
        char *newTopic = taosArrayGetP(pTopicList, j);
730
        int   comp = strcmp(oldTopic, newTopic);
731 732 733 734 735
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
736
          char *oldTopicCopy = taosStrdup(oldTopic);
737 738 739 740
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
741
          char *newTopicCopy = taosStrdup(newTopic);
742 743 744 745 746 747 748
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

749 750
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
751
      goto _over;
752 753
    }

754 755
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
756 757
  }

S
Shengliang Guan 已提交
758
  code = TSDB_CODE_ACTION_IN_PROGRESS;
759

760
_over:
L
Liu Jicong 已提交
761 762
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
763 764 765
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
766
  }
H
Haojun Liao 已提交
767

wmmhello's avatar
wmmhello 已提交
768
  tDeleteSMqConsumerObj(pConsumerNew, true);
769

770
  // TODO: replace with destroy subscribe msg
771
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
772
  return code;
L
Liu Jicong 已提交
773 774
}

L
Liu Jicong 已提交
775 776
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
777 778

  void   *buf = NULL;
L
Liu Jicong 已提交
779
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
780 781 782
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
783
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
784

wafwerar's avatar
wafwerar 已提交
785
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
786 787
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
788
  void *abuf = buf;
L
Liu Jicong 已提交
789 790
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
791
  int32_t dataPos = 0;
L
Liu Jicong 已提交
792 793
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
794 795
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
796

L
Liu Jicong 已提交
797 798
  terrno = TSDB_CODE_SUCCESS;

799
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
800
  taosMemoryFreeClear(buf);
801
  if (terrno != 0) {
802
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
803 804 805 806
    sdbFreeRaw(pRaw);
    return NULL;
  }

807
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
808 809 810
  return pRaw;
}

L
Liu Jicong 已提交
811
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
812 813 814
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
815

H
Haojun Liao 已提交
816
  terrno = 0;
L
Liu Jicong 已提交
817
  int8_t sver = 0;
H
Haojun Liao 已提交
818 819 820
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
821 822 823

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
824
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
825 826
  }

827
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
828 829 830
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
831

832
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
833 834 835
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
836 837

  int32_t dataPos = 0;
L
Liu Jicong 已提交
838
  int32_t len;
L
Liu Jicong 已提交
839
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
840
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
841 842 843 844 845
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
846 847
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
848

L
Liu Jicong 已提交
849
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
850
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
851 852
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
853

H
Haojun Liao 已提交
854
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
855

L
Liu Jicong 已提交
856
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
857
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
858
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
859 860
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
861
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
862
  }
L
Liu Jicong 已提交
863 864 865 866

  return pRow;
}

L
Liu Jicong 已提交
867
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
868
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
H
Haojun Liao 已提交
869 870
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
wmmhello's avatar
wmmhello 已提交
871
  pConsumer->subscribeTime = taosGetTimestampMs();
L
Liu Jicong 已提交
872 873 874
  return 0;
}

L
Liu Jicong 已提交
875
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
876
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
X
Xiaoyu Wang 已提交
877
         mndConsumerStatusName(pConsumer->status));
wmmhello's avatar
wmmhello 已提交
878
  tDeleteSMqConsumerObj(pConsumer, false);
L
Liu Jicong 已提交
879 880 881
  return 0;
}

X
Xiaoyu Wang 已提交
882
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
883 884 885
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
886
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
wmmhello's avatar
wmmhello 已提交
887 888 889
      pConsumer->status = MQ_CONSUMER_STATUS_READY;
    } else if (status == MQ_CONSUMER_STATUS_READY) {
      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
890 891 892 893 894
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
895
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
896
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
897
  for (int32_t i = 0; i < size; i++) {
898 899 900 901 902
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

wmmhello's avatar
wmmhello 已提交
903
      mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
904
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
905 906 907 908 909 910
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
911
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
912 913 914 915 916 917
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
918

wmmhello's avatar
wmmhello 已提交
919
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
920
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
921 922 923 924 925
      break;
    }
  }
}

926 927 928 929 930 931 932 933
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

wmmhello's avatar
wmmhello 已提交
934
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
935 936 937 938 939 940 941 942 943 944 945 946 947 948
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
949 950 951
      break;
    }
  }
952 953

  return existing;
954 955
}

L
Liu Jicong 已提交
956
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
wmmhello's avatar
wmmhello 已提交
957 958
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
L
Liu Jicong 已提交
959

960 961
  taosWLockLatch(&pOldConsumer->lock);

wmmhello's avatar
wmmhello 已提交
962
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
963 964 965
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
966

wmmhello's avatar
wmmhello 已提交
967
    pOldConsumer->subscribeTime = taosGetTimestampMs();
968
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
969
    mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
970 971 972 973 974 975 976 977 978 979 980 981
//  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
//    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
//    for (int32_t i = 0; i < sz; i++) {
//      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
//      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
//    }
//
//    int32_t prevStatus = pOldConsumer->status;
//    pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
//    mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
//           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
//           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
wmmhello's avatar
wmmhello 已提交
982
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
L
Liu Jicong 已提交
983 984
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
985
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
986 987 988
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

989
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
990 991
    mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
992
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
993

wmmhello's avatar
wmmhello 已提交
994 995 996
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
    mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
997
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
998

999
    // check if exist in current topic
1000
    removeFromNewTopicList(pOldConsumer, pNewTopic);
1001 1002

    // add to current topic
1003 1004
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
wmmhello's avatar
wmmhello 已提交
1005
      mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
1006 1007
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
1008
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1009 1010
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
1011

1012
    // set status
H
Haojun Liao 已提交
1013
    int32_t status = pOldConsumer->status;
1014
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1015

H
Haojun Liao 已提交
1016
    // the re-balance is triggered when the new consumer is launched.
wmmhello's avatar
wmmhello 已提交
1017
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1018

1019
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
wmmhello's avatar
wmmhello 已提交
1020
    mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
1021
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1022
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1023 1024 1025 1026
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

wmmhello's avatar
wmmhello 已提交
1027
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
1028 1029 1030
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1031
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1032 1033

    // remove from current topic
1034
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1035 1036

    // set status
H
Haojun Liao 已提交
1037
    int32_t status = pOldConsumer->status;
1038
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1039

wmmhello's avatar
wmmhello 已提交
1040
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1041
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1042

wmmhello's avatar
wmmhello 已提交
1043
    mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1044
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1045
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1046 1047 1048
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1049 1050 1051
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1052 1053 1054
  return 0;
}

L
Liu Jicong 已提交
1055
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1056 1057
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1058
  if (pConsumer == NULL) {
1059
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1060 1061 1062 1063
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1064
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1065 1066 1067
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1068

S
Shengliang Guan 已提交
1069 1070
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1071 1072 1073 1074 1075 1076
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1077 1078 1079 1080
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1081
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1082
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1083 1084 1085
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1086 1087

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1088
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1089

L
Liu Jicong 已提交
1090 1091 1092 1093 1094 1095 1096
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1097 1098 1099 1100
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1101 1102 1103 1104 1105
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
wmmhello's avatar
wmmhello 已提交
1106 1107 1108 1109
      char        consumerIdHex[32] = {0};
      sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1110
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1111
      colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1112

L
Liu Jicong 已提交
1113 1114
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1115 1116
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1117
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1118
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1119

1120
      // client id
L
Liu Jicong 已提交
1121
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1122 1123
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1124
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1125
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1126 1127

      // status
X
Xiaoyu Wang 已提交
1128 1129
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1130 1131
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1132
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1133
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1134 1135 1136 1137 1138 1139

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1140
        STR_TO_VARSTR(topic, topicName);
1141
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1142
      } else {
1143
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1144 1145 1146
      }

      // end point
L
Liu Jicong 已提交
1147
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1148
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1149 1150 1151

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1152
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
L
Liu Jicong 已提交
1153 1154 1155

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1156
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1157 1158 1159

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1160
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1161 1162 1163

      numOfRows++;
    }
1164

L
Liu Jicong 已提交
1165 1166
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1167 1168

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
wmmhello's avatar
wmmhello 已提交
1182
    case MQ_CONSUMER_STATUS_READY:
L
Liu Jicong 已提交
1183
      return "ready";
wmmhello's avatar
wmmhello 已提交
1184
    case MQ_CONSUMER_STATUS_LOST:
L
Liu Jicong 已提交
1185
      return "lost";
1186
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1187 1188 1189 1190 1191
      return "rebalancing";
    default:
      return "unknown";
  }
}