mndConsumer.c 40.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
80
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

X
Xiaoyu Wang 已提交
84
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
85

S
Shengliang Guan 已提交
86 87
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
88
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
89
}
L
Liu Jicong 已提交
90

S
Shengliang Guan 已提交
91
void mndRebCntDec() {
92 93 94 95 96 97 98 99 100 101
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
102
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
103 104 105
      break;
    }
  }
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

S
Shengliang Guan 已提交
108 109 110
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
111
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
112 113 114
  if (pConsumer == NULL) {
    return 0;
  }
115

X
Xiaoyu Wang 已提交
116
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
117 118
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
119 120 121 122 123
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

124 125 126 127 128
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

129
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
130 131 132 133 134 135 136 137 138 139 140
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
141

L
Liu Jicong 已提交
142
  tDeleteSMqConsumerObj(pConsumerNew);
143 144
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
145 146
  return 0;
FAIL:
L
Liu Jicong 已提交
147
  tDeleteSMqConsumerObj(pConsumerNew);
148
  taosMemoryFree(pConsumerNew);
149 150 151 152
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
153 154 155
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
156
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
157 158 159 160
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
161

H
Haojun Liao 已提交
162 163
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
164

L
Liu Jicong 已提交
165
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
166
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
167
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
168 169 170
    return -1;
  }

L
Liu Jicong 已提交
171 172 173 174 175
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

176
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
177 178 179 180
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
181 182 183
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
184 185
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
186 187 188 189
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
190
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
191 192 193 194
  mndTransDrop(pTrans);
  return -1;
}

195
// todo check the clear process
L
Liu Jicong 已提交
196 197 198
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
199 200

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
201
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
202
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
203 204 205
    return 0;
  }

H
Haojun Liao 已提交
206
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
221 222

  // this is the drop action, not the update action
L
Liu Jicong 已提交
223 224 225 226 227 228 229
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
230

L
Liu Jicong 已提交
231 232 233 234 235 236 237
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
238
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
239 240 241 242
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
243 244 245
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
246 247 248
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
249
  }
L
Liu Jicong 已提交
250
  return pRebInfo;
251 252
}

X
Xiaoyu Wang 已提交
253 254
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
255 256 257 258
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
259 260
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
261 262 263 264
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
265
  mDebug("start to process mq timer");
266

267
  // rebalance cannot be parallel
L
Liu Jicong 已提交
268
  if (!mndRebTryStart()) {
D
dapan1121 已提交
269
    mDebug("mq rebalance already in progress, do nothing");
270 271 272 273
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
274
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
275
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
276 277 278 279
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

280
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
281 282 283 284 285 286 287 288
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
289 290 291 292

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
293 294 295
    if (pIter == NULL) {
      break;
    }
296 297 298

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
299

X
Xiaoyu Wang 已提交
300 301 302
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
303 304

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
305 306
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
H
Haojun Liao 已提交
307 308
        if (pLostMsg == NULL) {
          mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
H
Haojun Liao 已提交
309
              pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
H
Haojun Liao 已提交
310 311
          continue;
        }
H
Haojun Liao 已提交
312 313 314

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
315
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
316

317 318
        mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
            MND_CONSUMER_LOST_HB_CNT);
H
Haojun Liao 已提交
319 320
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
321
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
322
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
323 324
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
H
Haojun Liao 已提交
325 326
        if (pClearMsg == NULL) {
          mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
H
Haojun Liao 已提交
327
                 pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
H
Haojun Liao 已提交
328 329
          continue;
        }
L
Liu Jicong 已提交
330 331 332

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
333
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
334

335 336
        mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
               MND_CONSUMER_LOST_CLEAR_THRESHOLD);
L
Liu Jicong 已提交
337 338
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
339
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
340
      taosRLockLatch(&pConsumer->lock);
341 342 343 344 345
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
346
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
347 348
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
349
      taosRUnLockLatch(&pConsumer->lock);
350
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
351
      taosRLockLatch(&pConsumer->lock);
352

353 354 355 356 357
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
358
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
359 360 361 362 363 364 365 366
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
367
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
368 369
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
370
      taosRUnLockLatch(&pConsumer->lock);
371 372 373 374 375 376 377 378
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
379
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
380 381 382 383 384 385 386
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
387
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
388
    mndRebEnd();
389 390 391 392
  }
  return 0;
}

393
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
394 395
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
396

D
dapan1121 已提交
397 398 399 400 401
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
402
  int64_t         consumerId = req.consumerId;
403
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
404
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
405
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
406 407 408
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
409 410 411 412 413 414

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
415
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
416 417 418
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
419
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
420
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
421 422 423 424
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
425 426 427 428 429 430 431
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
432
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
433 434 435
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
436 437 438 439 440 441

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
442 443
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
444

445
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
446
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
447
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
448 449 450 451
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
452 453 454 455 456 457 458
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
459

460 461 462 463
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
464

L
Liu Jicong 已提交
465
#if 1
L
Liu Jicong 已提交
466
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
467
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
468 469 470
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
471
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
472
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
473 474 475
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
476

477
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
478
  }
479
#endif
480 481

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
482
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
483 484 485 486 487 488
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

489
  // 2. check epoch, only send ep info when epochs do not match
490 491
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
492 493
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
494 495 496 497 498
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
499
      taosRUnLockLatch(&pConsumer->lock);
500 501 502
      goto FAIL;
    }

H
Haojun Liao 已提交
503
    // handle all topics subscribed by this consumer
504 505 506 507
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
508

509 510 511 512 513 514 515 516
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
517
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
518
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
519 520 521 522
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
523 524 525 526
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
527 528
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
529

H
Haojun Liao 已提交
530
      // this customer assigned vgroups
531 532 533
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
534
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
535 536
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
537 538 539 540
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
541
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
560

561
  // encode rsp
L
Liu Jicong 已提交
562
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
563 564
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
565
    terrno = TSDB_CODE_OUT_OF_MEMORY;
566
    return -1;
L
Liu Jicong 已提交
567
  }
H
Haojun Liao 已提交
568

569 570 571
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
572

573
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
574
  tEncodeSMqAskEpRsp(&abuf, &rsp);
575 576

  // release consumer and free memory
L
Liu Jicong 已提交
577
  tDeleteSMqAskEpRsp(&rsp);
578 579 580
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
581 582
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
583
  return 0;
H
Haojun Liao 已提交
584

585
FAIL:
L
Liu Jicong 已提交
586
  tDeleteSMqAskEpRsp(&rsp);
587 588 589 590
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
591 592 593 594 595 596 597 598
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

599 600 601 602 603 604 605 606
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
607
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
628
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
629

X
Xiaoyu Wang 已提交
630 631
static void freeItem(void *param) {
  void *pItem = *(void **)param;
632 633 634
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
635 636
}

637 638 639 640
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

641 642
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
643 644

  uint64_t        consumerId = subscribe.consumerId;
645
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
646
  SMqConsumerObj *pExistedConsumer = NULL;
647 648 649
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
650 651
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
652
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
653

654
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
655

H
Haojun Liao 已提交
656
  // check topic existence
657
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
658 659 660
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
661

662 663 664
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
665 666
  }

H
Haojun Liao 已提交
667 668
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
669 670
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
671

672
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
673
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
674 675

    // set the update type
676
    pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
H
Haojun Liao 已提交
677
    taosArrayDestroy(pConsumerNew->assignedTopics);
678 679 680
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
681
    taosArrayDestroy(pConsumerNew->rebNewTopics);
682
    pConsumerNew->rebNewTopics = pTopicList;
683 684
    subscribe.topicNames = NULL;

685 686
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
687 688

  } else {
H
Haojun Liao 已提交
689
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
690

X
Xiaoyu Wang 已提交
691 692 693
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
694

695 696
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
697
      goto _over;
698 699 700 701
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
702
      goto _over;
703
    }
H
Haojun Liao 已提交
704

705
    // set the update type
706
    pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
H
Haojun Liao 已提交
707
    taosArrayDestroy(pConsumerNew->assignedTopics);
708
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
709

X
Xiaoyu Wang 已提交
710
    int32_t oldTopicNum = (pExistedConsumer->currentTopics) ? taosArrayGetSize(pExistedConsumer->currentTopics) : 0;
711 712 713 714

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
715
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
716 717 718 719
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
720
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
721 722 723 724
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
725
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
726
        char *newTopic = taosArrayGetP(pTopicList, j);
727
        int   comp = strcmp(oldTopic, newTopic);
728 729 730 731 732
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
733
          char *oldTopicCopy = taosStrdup(oldTopic);
734 735 736 737
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
738
          char *newTopicCopy = taosStrdup(newTopic);
739 740 741 742 743 744 745
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

746 747
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
748
      goto _over;
749 750
    }

751 752
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
753 754
  }

S
Shengliang Guan 已提交
755
  code = TSDB_CODE_ACTION_IN_PROGRESS;
756

757
_over:
L
Liu Jicong 已提交
758 759
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
760 761 762
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
763
  }
H
Haojun Liao 已提交
764

765 766
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
767
    taosMemoryFree(pConsumerNew);
768
  }
769

770
  // TODO: replace with destroy subscribe msg
771
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
772
  return code;
L
Liu Jicong 已提交
773 774
}

L
Liu Jicong 已提交
775 776
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
777 778

  void   *buf = NULL;
L
Liu Jicong 已提交
779
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
780 781 782
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
783
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
784

wafwerar's avatar
wafwerar 已提交
785
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
786 787
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
788
  void *abuf = buf;
L
Liu Jicong 已提交
789 790
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
791
  int32_t dataPos = 0;
L
Liu Jicong 已提交
792 793
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
794 795
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
796

L
Liu Jicong 已提交
797 798
  terrno = TSDB_CODE_SUCCESS;

799
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
800
  taosMemoryFreeClear(buf);
801
  if (terrno != 0) {
802
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
803 804 805 806
    sdbFreeRaw(pRaw);
    return NULL;
  }

807
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
808 809 810
  return pRaw;
}

L
Liu Jicong 已提交
811
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
812 813 814
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
815

L
Liu Jicong 已提交
816
  int8_t sver = 0;
H
Haojun Liao 已提交
817 818 819
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
820 821 822

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
823
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
824 825
  }

826
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
827 828 829
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
830

831
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
832 833 834
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
835 836

  int32_t dataPos = 0;
L
Liu Jicong 已提交
837
  int32_t len;
L
Liu Jicong 已提交
838
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
839
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
840 841 842 843 844
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
845 846
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
847

L
Liu Jicong 已提交
848
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
849
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
850 851
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
852

H
Haojun Liao 已提交
853
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
854

L
Liu Jicong 已提交
855
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
856
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
857
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
858 859
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
860
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
861
  }
L
Liu Jicong 已提交
862 863 864 865

  return pRow;
}

L
Liu Jicong 已提交
866
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
867 868 869
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
870
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
871 872 873
  return 0;
}

L
Liu Jicong 已提交
874
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
X
Xiaoyu Wang 已提交
875 876
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
         mndConsumerStatusName(pConsumer->status));
877
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
878 879 880
  return 0;
}

X
Xiaoyu Wang 已提交
881
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
882 883 884
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
885
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
886
      pConsumer->status = MQ_CONSUMER_STATUS__READY;
wmmhello's avatar
wmmhello 已提交
887
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
H
Haojun Liao 已提交
888
      ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0);
889 890 891 892 893 894
      pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
895
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
896
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
897
  for (int32_t i = 0; i < size; i++) {
898 899 900 901 902 903
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

      mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
904
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
905 906 907 908 909 910
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
911
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
912 913 914 915 916 917
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948

      mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
      break;
    }
  }
}

static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

      mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
949 950 951
      break;
    }
  }
952 953

  return existing;
954 955
}

L
Liu Jicong 已提交
956
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
957 958
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
959

960 961
  taosWLockLatch(&pOldConsumer->lock);

962 963 964 965
  if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) {
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
966

967
    pOldConsumer->subscribeTime = pNewConsumer->upTime;
968
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
969 970 971
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < sz; i++) {
972
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
973
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
974
    }
L
Liu Jicong 已提交
975 976 977

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

978
    int32_t prevStatus = pOldConsumer->status;
979
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
980
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
981
           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
H
Haojun Liao 已提交
982
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
983 984 985
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
986
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
987 988 989 990
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
991
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
992 993
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
994 995 996

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

997
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
998
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
999

1000
    // check if exist in current topic
1001
    removeFromNewTopicList(pOldConsumer, pNewTopic);
1002 1003

    // add to current topic
1004 1005 1006 1007
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
1008
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1009 1010
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
1011

1012
    // set status
H
Haojun Liao 已提交
1013
    int32_t status = pOldConsumer->status;
1014
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1015

H
Haojun Liao 已提交
1016
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
1017 1018
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

1019
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
1020 1021
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1022
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1023 1024 1025 1026
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

1027 1028 1029 1030
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1031
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1032 1033

    // remove from current topic
1034
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1035 1036

    // set status
H
Haojun Liao 已提交
1037
    int32_t status = pOldConsumer->status;
1038
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1039 1040

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1041
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1042

1043
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1044
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1045
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1046 1047 1048
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1049 1050 1051
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1052 1053 1054
  return 0;
}

L
Liu Jicong 已提交
1055
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1056 1057
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1058
  if (pConsumer == NULL) {
1059
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1060 1061 1062 1063
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1064
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1065 1066 1067
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1068

S
Shengliang Guan 已提交
1069 1070
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1071 1072 1073 1074 1075 1076
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1077 1078 1079 1080
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1081
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1082
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1083 1084 1085
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1086 1087

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1088
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1089

L
Liu Jicong 已提交
1090 1091 1092 1093 1094 1095 1096
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1097 1098 1099 1100
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1101 1102 1103 1104 1105 1106
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1107
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1108

L
Liu Jicong 已提交
1109 1110
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1111 1112
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1113
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1114
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1115

1116
      // client id
L
Liu Jicong 已提交
1117
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1118 1119
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1120
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1121
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1122 1123

      // status
X
Xiaoyu Wang 已提交
1124 1125
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1126 1127
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1128
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1129
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1130 1131 1132 1133 1134 1135

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1136
        STR_TO_VARSTR(topic, topicName);
1137
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1138
      } else {
1139
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1140 1141 1142
      }

      // end point
L
Liu Jicong 已提交
1143
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1144
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1145 1146 1147

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1148
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1149 1150 1151

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1152
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1153 1154 1155

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1156
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1157 1158 1159

      numOfRows++;
    }
1160

L
Liu Jicong 已提交
1161 1162
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1163 1164

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
      return "lost";
1183
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1184 1185 1186 1187 1188
      return "rebalancing";
    default:
      return "unknown";
  }
}