mndConsumer.c 40.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
wmmhello's avatar
wmmhello 已提交
66
//  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
  SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
  if (pClearMsg == NULL) {
    mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
    return;
  }

  pClearMsg->consumerId = consumerId;
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};

  mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return;
}

L
Liu Jicong 已提交
94
bool mndRebTryStart() {
H
Haojun Liao 已提交
95
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
96
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
97 98 99
  return old == 0;
}

X
Xiaoyu Wang 已提交
100
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
101

S
Shengliang Guan 已提交
102 103
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
104
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
105
}
L
Liu Jicong 已提交
106

S
Shengliang Guan 已提交
107
void mndRebCntDec() {
108 109 110 111 112 113 114 115 116 117
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
118
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
119 120 121
      break;
    }
  }
S
Shengliang Guan 已提交
122
}
L
Liu Jicong 已提交
123

wmmhello's avatar
wmmhello 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
//  SMnode             *pMnode = pMsg->info.node;
//  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
//  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
//  if (pConsumer == NULL) {
//    return 0;
//  }
//
//  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
//        mndConsumerStatusName(pConsumer->status));
//
//  if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
//
//  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
//  mndReleaseConsumer(pMnode, pConsumer);
//
//  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
//  if (pTrans == NULL) {
//    goto FAIL;
//  }
//
//  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
//    goto FAIL;
//  }
//
//  if (mndTransPrepare(pMnode, pTrans) != 0) {
//    goto FAIL;
//  }
//
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return 0;
//FAIL:
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return -1;
//}
166

S
Shengliang Guan 已提交
167 168 169
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
170
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
171 172 173 174
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
175

H
Haojun Liao 已提交
176 177
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
178

wmmhello's avatar
wmmhello 已提交
179
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
180
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
181
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
182 183 184
    return -1;
  }

L
Liu Jicong 已提交
185
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
186
  pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
L
Liu Jicong 已提交
187 188 189

  mndReleaseConsumer(pMnode, pConsumer);

190
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
191 192 193 194
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
195 196 197
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
198 199
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
200 201 202
  mndTransDrop(pTrans);
  return 0;
FAIL:
wmmhello's avatar
wmmhello 已提交
203 204
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
205 206 207 208
  mndTransDrop(pTrans);
  return -1;
}

209
// todo check the clear process
L
Liu Jicong 已提交
210 211 212
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
213 214

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
215
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
216
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
217 218 219
    return 0;
  }

H
Haojun Liao 已提交
220
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
221 222
        mndConsumerStatusName(pConsumer->status));

wmmhello's avatar
wmmhello 已提交
223 224 225 226
//  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
L
Liu Jicong 已提交
227 228

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
229
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
L
Liu Jicong 已提交
230 231 232 233 234

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
235 236

  // this is the drop action, not the update action
L
Liu Jicong 已提交
237 238 239
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
240 241
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
242 243
  mndTransDrop(pTrans);
  return 0;
244

L
Liu Jicong 已提交
245
FAIL:
wmmhello's avatar
wmmhello 已提交
246 247
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
248 249 250 251
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
252
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
253 254 255 256
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
257 258 259
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
260 261 262
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
263
  }
L
Liu Jicong 已提交
264
  return pRebInfo;
265 266
}

X
Xiaoyu Wang 已提交
267 268
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
269 270 271 272
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
273 274
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
275 276 277 278
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
279
  mDebug("start to process mq timer");
280

281
  // rebalance cannot be parallel
L
Liu Jicong 已提交
282
  if (!mndRebTryStart()) {
D
dapan1121 已提交
283
    mDebug("mq rebalance already in progress, do nothing");
284 285 286 287
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
288
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
289
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
290 291 292 293
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

294
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
295 296 297 298 299 300 301 302
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
303 304 305 306

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
307 308 309
    if (pIter == NULL) {
      break;
    }
310 311 312

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
313

wmmhello's avatar
wmmhello 已提交
314 315
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
X
Xiaoyu Wang 已提交
316
           hbStatus);
L
Liu Jicong 已提交
317

wmmhello's avatar
wmmhello 已提交
318
    if (status == MQ_CONSUMER_STATUS_READY) {
wmmhello's avatar
wmmhello 已提交
319 320 321
      if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {   // unsubscribe or close
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
      } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
wmmhello's avatar
wmmhello 已提交
322 323 324 325 326 327 328 329
        taosRLockLatch(&pConsumer->lock);
        int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < topicNum; i++) {
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
          mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
          SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
H
Haojun Liao 已提交
330
        }
wmmhello's avatar
wmmhello 已提交
331
        taosRUnLockLatch(&pConsumer->lock);
H
Haojun Liao 已提交
332
      }
wmmhello's avatar
wmmhello 已提交
333
    } else if (status == MQ_CONSUMER_STATUS_LOST) {
wmmhello's avatar
wmmhello 已提交
334
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {   // clear consumer if lost a day
335
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
L
Liu Jicong 已提交
336
      }
337
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
338
      taosRLockLatch(&pConsumer->lock);
339

340 341 342 343 344
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
345
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
346 347 348 349 350 351 352 353
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
354
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
355 356
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
357
      taosRUnLockLatch(&pConsumer->lock);
358 359 360 361 362 363 364 365
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
366
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
367 368 369 370 371 372 373
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
374
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
375
    mndRebEnd();
376 377 378 379
  }
  return 0;
}

380
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
381 382
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
383

D
dapan1121 已提交
384 385 386 387 388
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
389
  int64_t         consumerId = req.consumerId;
390
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
391
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
392
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
393 394 395
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
396 397 398 399 400

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

wmmhello's avatar
wmmhello 已提交
401
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
402
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
403 404 405
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
406
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
407
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
408 409 410 411
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
412 413 414 415 416 417 418
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
419
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
420 421 422
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
423 424 425 426 427 428

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
429 430
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
431

432
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
433
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
434
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
435 436 437 438
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
439 440 441 442 443
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
wmmhello's avatar
wmmhello 已提交
444
    goto FAIL;
H
Haojun Liao 已提交
445
  }
446

447 448 449 450
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
451

wmmhello's avatar
wmmhello 已提交
452
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
453
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
454 455 456
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
457
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
458
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
459 460 461
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
462

463
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
464 465
  }

wmmhello's avatar
wmmhello 已提交
466
  if (status != MQ_CONSUMER_STATUS_READY) {
X
Xiaoyu Wang 已提交
467
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
468
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
wmmhello's avatar
wmmhello 已提交
469
    goto FAIL;
470 471 472 473
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

474
  // 2. check epoch, only send ep info when epochs do not match
475 476
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
477 478
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
479 480 481 482 483
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
484
      taosRUnLockLatch(&pConsumer->lock);
485 486 487
      goto FAIL;
    }

H
Haojun Liao 已提交
488
    // handle all topics subscribed by this consumer
489 490 491 492
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
493

494 495 496 497 498 499 500 501
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
502
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
503
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
504 505 506 507
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
508 509 510 511
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
512 513
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
514

H
Haojun Liao 已提交
515
      // this customer assigned vgroups
516 517 518
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
519
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
520 521
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
522 523 524 525
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
526
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
545

546
  // encode rsp
L
Liu Jicong 已提交
547
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
548 549
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
550
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
551
    goto FAIL;
L
Liu Jicong 已提交
552
  }
H
Haojun Liao 已提交
553

554 555 556 557 558 559 560 561
  SMqRspHead* pHead = buf;

  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  pHead->epoch = serverEpoch;
  pHead->consumerId = pConsumer->consumerId;
  pHead->walsver = 0;
  pHead->walever = 0;

S
Shengliang Guan 已提交
562

563
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
564
  tEncodeSMqAskEpRsp(&abuf, &rsp);
565 566

  // release consumer and free memory
L
Liu Jicong 已提交
567
  tDeleteSMqAskEpRsp(&rsp);
568 569 570
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
571 572
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
573
  return 0;
H
Haojun Liao 已提交
574

575
FAIL:
L
Liu Jicong 已提交
576
  tDeleteSMqAskEpRsp(&rsp);
577 578 579 580
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
581 582 583 584 585 586 587 588
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

589 590 591 592 593 594 595 596
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
597
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
618
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
619

X
Xiaoyu Wang 已提交
620 621
static void freeItem(void *param) {
  void *pItem = *(void **)param;
622 623 624
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
625 626
}

627 628 629 630
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

631 632
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
633

634
  int64_t        consumerId = subscribe.consumerId;
635
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
636
  SMqConsumerObj *pExistedConsumer = NULL;
637 638 639
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
640 641
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
642
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
643

644
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
645

H
Haojun Liao 已提交
646
  // check topic existence
647
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
648 649 650
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
651

652 653 654
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
655 656
  }

H
Haojun Liao 已提交
657 658
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
659 660
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
661

662
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
663
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
664

wmmhello's avatar
wmmhello 已提交
665
//  pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;   // use insert logic
H
Haojun Liao 已提交
666
    taosArrayDestroy(pConsumerNew->assignedTopics);
667 668 669
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
670
    taosArrayDestroy(pConsumerNew->rebNewTopics);
671
    pConsumerNew->rebNewTopics = pTopicList;
672 673
    subscribe.topicNames = NULL;

674 675
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
676 677

  } else {
H
Haojun Liao 已提交
678
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
679

X
Xiaoyu Wang 已提交
680 681 682
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
683

wmmhello's avatar
wmmhello 已提交
684
    if (status != MQ_CONSUMER_STATUS_READY) {
685
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
686
      goto _over;
687 688 689 690
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
691
      goto _over;
692
    }
H
Haojun Liao 已提交
693

694
    // set the update type
wmmhello's avatar
wmmhello 已提交
695
    pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
H
Haojun Liao 已提交
696
    taosArrayDestroy(pConsumerNew->assignedTopics);
697
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
698

wmmhello's avatar
wmmhello 已提交
699
    int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
700 701 702 703

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
704
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
705 706 707 708
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
709
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
710 711 712 713
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
714
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
715
        char *newTopic = taosArrayGetP(pTopicList, j);
716
        int   comp = strcmp(oldTopic, newTopic);
717 718 719 720 721
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
722
          char *oldTopicCopy = taosStrdup(oldTopic);
723 724 725 726
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
727
          char *newTopicCopy = taosStrdup(newTopic);
728 729 730 731 732 733 734
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

735 736
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
737
      goto _over;
738 739
    }

740 741
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
742 743
  }

S
Shengliang Guan 已提交
744
  code = TSDB_CODE_ACTION_IN_PROGRESS;
745

746
_over:
L
Liu Jicong 已提交
747 748
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
749 750 751
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
752
  }
H
Haojun Liao 已提交
753

wmmhello's avatar
wmmhello 已提交
754
  tDeleteSMqConsumerObj(pConsumerNew, true);
755

756
  // TODO: replace with destroy subscribe msg
757
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
758
  return code;
L
Liu Jicong 已提交
759 760
}

L
Liu Jicong 已提交
761 762
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
763 764

  void   *buf = NULL;
L
Liu Jicong 已提交
765
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
766 767 768
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
769
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
770

wafwerar's avatar
wafwerar 已提交
771
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
772 773
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
774
  void *abuf = buf;
L
Liu Jicong 已提交
775 776
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
777
  int32_t dataPos = 0;
L
Liu Jicong 已提交
778 779
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
780 781
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
782

L
Liu Jicong 已提交
783 784
  terrno = TSDB_CODE_SUCCESS;

785
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
786
  taosMemoryFreeClear(buf);
787
  if (terrno != 0) {
788
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
789 790 791 792
    sdbFreeRaw(pRaw);
    return NULL;
  }

793
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
794 795 796
  return pRaw;
}

L
Liu Jicong 已提交
797
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
798 799 800
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
801

H
Haojun Liao 已提交
802
  terrno = 0;
L
Liu Jicong 已提交
803
  int8_t sver = 0;
H
Haojun Liao 已提交
804 805 806
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
807 808 809

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
810
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
811 812
  }

813
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
814 815 816
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
817

818
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
819 820 821
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
822 823

  int32_t dataPos = 0;
L
Liu Jicong 已提交
824
  int32_t len;
L
Liu Jicong 已提交
825
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
826
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
827 828 829 830 831
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
832 833
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
834

L
Liu Jicong 已提交
835
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
836
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
837 838
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
839

H
Haojun Liao 已提交
840
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
841

L
Liu Jicong 已提交
842
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
843
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
844
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
845 846
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
847
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
848
  }
L
Liu Jicong 已提交
849 850 851 852

  return pRow;
}

L
Liu Jicong 已提交
853
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
854
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
H
Haojun Liao 已提交
855 856
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
wmmhello's avatar
wmmhello 已提交
857
  pConsumer->subscribeTime = taosGetTimestampMs();
L
Liu Jicong 已提交
858 859 860
  return 0;
}

L
Liu Jicong 已提交
861
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
862
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
X
Xiaoyu Wang 已提交
863
         mndConsumerStatusName(pConsumer->status));
wmmhello's avatar
wmmhello 已提交
864
  tDeleteSMqConsumerObj(pConsumer, false);
L
Liu Jicong 已提交
865 866 867
  return 0;
}

X
Xiaoyu Wang 已提交
868
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
869 870 871
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
872
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
wmmhello's avatar
wmmhello 已提交
873 874 875
      pConsumer->status = MQ_CONSUMER_STATUS_READY;
    } else if (status == MQ_CONSUMER_STATUS_READY) {
      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
876 877 878 879 880
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
881
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
882
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
883
  for (int32_t i = 0; i < size; i++) {
884 885 886 887 888
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

wmmhello's avatar
wmmhello 已提交
889
      mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
890
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
891 892 893 894 895 896
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
897
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
898 899 900 901 902 903
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
904

wmmhello's avatar
wmmhello 已提交
905
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
906
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
907 908 909 910 911
      break;
    }
  }
}

912 913 914 915 916 917 918 919
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

wmmhello's avatar
wmmhello 已提交
920
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
921 922 923 924 925 926 927 928 929 930 931 932 933 934
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
935 936 937
      break;
    }
  }
938 939

  return existing;
940 941
}

L
Liu Jicong 已提交
942
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
wmmhello's avatar
wmmhello 已提交
943 944
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
L
Liu Jicong 已提交
945

946 947
  taosWLockLatch(&pOldConsumer->lock);

wmmhello's avatar
wmmhello 已提交
948
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
949 950 951
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
952

wmmhello's avatar
wmmhello 已提交
953
    pOldConsumer->subscribeTime = taosGetTimestampMs();
954
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
955
    mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
956 957 958 959 960 961 962 963 964 965 966 967
//  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
//    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
//    for (int32_t i = 0; i < sz; i++) {
//      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
//      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
//    }
//
//    int32_t prevStatus = pOldConsumer->status;
//    pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
//    mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
//           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
//           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
wmmhello's avatar
wmmhello 已提交
968
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
L
Liu Jicong 已提交
969 970
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
971
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
972 973 974
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

975
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
976 977
    mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
978
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
979

wmmhello's avatar
wmmhello 已提交
980 981 982
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
    mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
983
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
984

985
    // check if exist in current topic
986
    removeFromNewTopicList(pOldConsumer, pNewTopic);
987 988

    // add to current topic
989 990
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
wmmhello's avatar
wmmhello 已提交
991
      mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
992 993
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
994
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
995 996
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
997

998
    // set status
H
Haojun Liao 已提交
999
    int32_t status = pOldConsumer->status;
1000
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1001

H
Haojun Liao 已提交
1002
    // the re-balance is triggered when the new consumer is launched.
wmmhello's avatar
wmmhello 已提交
1003
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1004

1005
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
wmmhello's avatar
wmmhello 已提交
1006
    mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
1007
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1008
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1009 1010 1011 1012
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

wmmhello's avatar
wmmhello 已提交
1013
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
1014 1015 1016
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1017
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1018 1019

    // remove from current topic
1020
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1021 1022

    // set status
H
Haojun Liao 已提交
1023
    int32_t status = pOldConsumer->status;
1024
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1025

wmmhello's avatar
wmmhello 已提交
1026
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1027
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1028

wmmhello's avatar
wmmhello 已提交
1029
    mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1030
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1031
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1032 1033 1034
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1035 1036 1037
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1038 1039 1040
  return 0;
}

L
Liu Jicong 已提交
1041
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1042 1043
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1044
  if (pConsumer == NULL) {
1045
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1046 1047 1048 1049
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1050
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1051 1052 1053
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1054

S
Shengliang Guan 已提交
1055 1056
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1057 1058 1059 1060 1061 1062
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1063 1064 1065 1066
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1067
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1068
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1069 1070 1071
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1072 1073

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1074
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1075

L
Liu Jicong 已提交
1076 1077 1078 1079 1080 1081 1082
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1083 1084 1085 1086
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1087 1088 1089 1090 1091
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
wmmhello's avatar
wmmhello 已提交
1092 1093 1094 1095
      char        consumerIdHex[32] = {0};
      sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1096
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1097
      colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1098

L
Liu Jicong 已提交
1099 1100
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1101 1102
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1103
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1104
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1105

1106
      // client id
L
Liu Jicong 已提交
1107
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1108 1109
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1110
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1111
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1112 1113

      // status
X
Xiaoyu Wang 已提交
1114 1115
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1116 1117
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1118
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1119
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1120 1121 1122 1123 1124 1125

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1126
        STR_TO_VARSTR(topic, topicName);
1127
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1128
      } else {
1129
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1130 1131 1132
      }

      // end point
L
Liu Jicong 已提交
1133
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1134
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1135 1136 1137

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1138
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
L
Liu Jicong 已提交
1139 1140 1141

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1142
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1143 1144 1145

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1146
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1147 1148 1149

      numOfRows++;
    }
1150

L
Liu Jicong 已提交
1151 1152
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1153 1154

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
wmmhello's avatar
wmmhello 已提交
1168
    case MQ_CONSUMER_STATUS_READY:
L
Liu Jicong 已提交
1169
      return "ready";
wmmhello's avatar
wmmhello 已提交
1170
    case MQ_CONSUMER_STATUS_LOST:
L
Liu Jicong 已提交
1171
      return "lost";
1172
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1173 1174 1175 1176 1177
      return "rebalancing";
    default:
      return "unknown";
  }
}