mndConsumer.c 39.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
80
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

X
Xiaoyu Wang 已提交
84
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
85

S
Shengliang Guan 已提交
86 87
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
88
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
89
}
L
Liu Jicong 已提交
90

S
Shengliang Guan 已提交
91
void mndRebCntDec() {
92 93 94 95 96 97 98 99 100 101
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
102
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
103 104 105
      break;
    }
  }
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

S
Shengliang Guan 已提交
108 109 110
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
111
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
112 113 114
  if (pConsumer == NULL) {
    return 0;
  }
115

X
Xiaoyu Wang 已提交
116
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
117 118
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
119 120 121 122 123
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

124 125 126 127 128
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

129
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
130 131 132 133 134 135 136 137 138 139 140
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
141

L
Liu Jicong 已提交
142
  tDeleteSMqConsumerObj(pConsumerNew);
143 144
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
145 146
  return 0;
FAIL:
L
Liu Jicong 已提交
147
  tDeleteSMqConsumerObj(pConsumerNew);
148
  taosMemoryFree(pConsumerNew);
149 150 151 152
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
153 154 155
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
156
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
157 158 159 160
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
161

H
Haojun Liao 已提交
162 163
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
164

L
Liu Jicong 已提交
165
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
166
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
167
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
168 169 170
    return -1;
  }

L
Liu Jicong 已提交
171 172 173 174 175
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

176
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
177 178 179 180
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
181 182 183
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
184 185
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
186 187 188 189
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
190
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
191 192 193 194
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
225

L
Liu Jicong 已提交
226 227 228 229 230 231 232
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
233
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
234 235 236 237
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
238 239 240
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
241 242 243
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
244
  }
L
Liu Jicong 已提交
245
  return pRebInfo;
246 247
}

X
Xiaoyu Wang 已提交
248 249
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
250 251 252 253
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
254 255
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
256 257 258 259
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
260
  mDebug("start to process mq timer");
261

262
  // rebalance cannot be parallel
L
Liu Jicong 已提交
263
  if (!mndRebTryStart()) {
D
dapan1121 已提交
264
    mDebug("mq rebalance already in progress, do nothing");
265 266 267 268
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
269
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
270
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
271 272 273 274
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

275
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
276 277 278 279 280 281 282 283
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
284 285 286 287

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
288 289 290
    if (pIter == NULL) {
      break;
    }
291 292 293

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
294

X
Xiaoyu Wang 已提交
295 296 297
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
298 299

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
300 301 302 303 304 305 306 307 308
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
            .pCont = pLostMsg,
            .contLen = sizeof(SMqConsumerLostMsg),
        };
309

H
Haojun Liao 已提交
310 311
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
312
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
313
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
314 315 316 317 318 319 320 321 322
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
            .pCont = pClearMsg,
            .contLen = sizeof(SMqConsumerClearMsg),
        };
323

L
Liu Jicong 已提交
324 325
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
326
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
327
      taosRLockLatch(&pConsumer->lock);
328 329 330 331 332
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
333
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
334 335
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
336
      taosRUnLockLatch(&pConsumer->lock);
337
    } else {
L
Liu Jicong 已提交
338
      taosRLockLatch(&pConsumer->lock);
339

340 341 342 343 344
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
345
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
346 347 348 349 350 351 352 353
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
354
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
355 356
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
357
      taosRUnLockLatch(&pConsumer->lock);
358 359 360 361 362 363 364 365
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
366
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
367 368 369 370 371 372 373
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
374
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
375
    mndRebEnd();
376 377 378 379
  }
  return 0;
}

380
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
381 382
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
383

D
dapan1121 已提交
384 385 386 387 388
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
389
  int64_t         consumerId = req.consumerId;
390
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
391
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
392
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
393 394 395
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
396 397 398 399 400 401

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
402
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
403 404 405
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
406
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
407
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
408 409 410 411
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
412 413 414 415 416 417 418
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
419
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
420 421 422
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
423 424 425 426 427 428

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
429 430
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
431

432
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
433
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
434
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
435 436 437 438
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
439 440 441 442 443 444 445
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
446

447 448 449 450
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
451

L
Liu Jicong 已提交
452
#if 1
L
Liu Jicong 已提交
453
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
454
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
455 456 457
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
458
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
459
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
460 461 462
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
463

464
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
465
  }
466
#endif
467 468

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
469
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
470 471 472 473 474 475
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

476
  // 2. check epoch, only send ep info when epochs do not match
477 478
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
479 480
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
481 482 483 484 485
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
486
      taosRUnLockLatch(&pConsumer->lock);
487 488 489
      goto FAIL;
    }

H
Haojun Liao 已提交
490
    // handle all topics subscribed by this consumer
491 492 493 494
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
495

496 497 498 499 500 501 502 503
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
504
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
505
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
506 507 508 509
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
510 511 512 513
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
514 515
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
516

H
Haojun Liao 已提交
517
      // this customer assigned vgroups
518 519 520
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
521
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
522 523
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
524 525 526 527
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
528
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
547

548
  // encode rsp
L
Liu Jicong 已提交
549
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
550 551
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
552
    terrno = TSDB_CODE_OUT_OF_MEMORY;
553
    return -1;
L
Liu Jicong 已提交
554
  }
H
Haojun Liao 已提交
555

556 557 558
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
559

560
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
561
  tEncodeSMqAskEpRsp(&abuf, &rsp);
562 563

  // release consumer and free memory
L
Liu Jicong 已提交
564
  tDeleteSMqAskEpRsp(&rsp);
565 566 567
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
568 569
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
570
  return 0;
H
Haojun Liao 已提交
571

572
FAIL:
L
Liu Jicong 已提交
573
  tDeleteSMqAskEpRsp(&rsp);
574 575 576 577
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
578 579 580 581 582 583 584 585
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

586 587 588 589 590 591 592 593
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
594
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
615
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
616

X
Xiaoyu Wang 已提交
617 618
static void freeItem(void *param) {
  void *pItem = *(void **)param;
619 620 621
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
622 623
}

624 625 626 627
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

628 629
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
630 631

  uint64_t        consumerId = subscribe.consumerId;
632
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
633
  SMqConsumerObj *pExistedConsumer = NULL;
634 635 636
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
637 638
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
639
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
640

641
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
642

H
Haojun Liao 已提交
643
  // check topic existence
644
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
645 646 647
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
648

649 650 651
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
652 653
  }

H
Haojun Liao 已提交
654 655
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
656 657
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
658

659
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
660
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
661 662

    // set the update type
663
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
664
    taosArrayDestroy(pConsumerNew->assignedTopics);
665 666 667
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
668
    taosArrayDestroy(pConsumerNew->rebNewTopics);
669
    pConsumerNew->rebNewTopics = pTopicList;
670 671
    subscribe.topicNames = NULL;

672 673
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
674 675

  } else {
H
Haojun Liao 已提交
676 677
    /*taosRLockLatch(&pExistedConsumer->lock);*/
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
678

X
Xiaoyu Wang 已提交
679 680 681
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
682

683 684
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
685
      goto _over;
686 687 688 689
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
690
      goto _over;
691
    }
H
Haojun Liao 已提交
692

693
    // set the update type
694
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
695
    taosArrayDestroy(pConsumerNew->assignedTopics);
696
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
697

X
Xiaoyu Wang 已提交
698
    int32_t oldTopicNum = (pExistedConsumer->currentTopics) ? taosArrayGetSize(pExistedConsumer->currentTopics) : 0;
699 700 701 702

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
703
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
704 705 706 707
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
708
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
709 710 711 712
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
713
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
714
        char *newTopic = taosArrayGetP(pTopicList, j);
715
        int   comp = strcmp(oldTopic, newTopic);
716 717 718 719 720
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
721
          char *oldTopicCopy = taosStrdup(oldTopic);
722 723 724 725
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
726
          char *newTopicCopy = taosStrdup(newTopic);
727 728 729 730 731 732 733
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

734 735
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
736
      goto _over;
737 738
    }

739 740
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
741 742
  }

S
Shengliang Guan 已提交
743
  code = TSDB_CODE_ACTION_IN_PROGRESS;
744

745
_over:
L
Liu Jicong 已提交
746 747
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
748 749 750
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
751
  }
H
Haojun Liao 已提交
752

753 754
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
755
    taosMemoryFree(pConsumerNew);
756
  }
757

758
  // TODO: replace with destroy subscribe msg
759
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
760
  return code;
L
Liu Jicong 已提交
761 762
}

L
Liu Jicong 已提交
763 764
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
765 766

  void   *buf = NULL;
L
Liu Jicong 已提交
767
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
768 769 770
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
771
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
772

wafwerar's avatar
wafwerar 已提交
773
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
774 775
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
776
  void *abuf = buf;
L
Liu Jicong 已提交
777 778
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
779
  int32_t dataPos = 0;
L
Liu Jicong 已提交
780 781
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
782 783
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
784

L
Liu Jicong 已提交
785 786
  terrno = TSDB_CODE_SUCCESS;

787
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
788
  taosMemoryFreeClear(buf);
789
  if (terrno != 0) {
790
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
791 792 793 794
    sdbFreeRaw(pRaw);
    return NULL;
  }

795
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
796 797 798
  return pRaw;
}

L
Liu Jicong 已提交
799
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
800 801 802
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
803

L
Liu Jicong 已提交
804
  int8_t sver = 0;
H
Haojun Liao 已提交
805 806 807
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
808 809 810

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
811
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
812 813
  }

814
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
815 816 817
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
818

819
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
820 821 822
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
823 824

  int32_t dataPos = 0;
L
Liu Jicong 已提交
825
  int32_t len;
L
Liu Jicong 已提交
826
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
827
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
828 829 830 831 832
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
833 834
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
835

L
Liu Jicong 已提交
836
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
837
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
838 839
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
840

H
Haojun Liao 已提交
841
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
842

L
Liu Jicong 已提交
843
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
844
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
845
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
846 847
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
848
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
849
  }
L
Liu Jicong 已提交
850 851 852 853

  return pRow;
}

L
Liu Jicong 已提交
854
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
855 856 857
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
858
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
859 860 861
  return 0;
}

L
Liu Jicong 已提交
862
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
X
Xiaoyu Wang 已提交
863 864
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
         mndConsumerStatusName(pConsumer->status));
865
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
866 867 868
  return 0;
}

X
Xiaoyu Wang 已提交
869
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
870 871 872
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
wmmhello's avatar
wmmhello 已提交
873
    if (status == MQ_CONSUMER_STATUS__MODIFY) {
874
      pConsumer->status = MQ_CONSUMER_STATUS__READY;
wmmhello's avatar
wmmhello 已提交
875
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
876 877 878 879 880 881
      pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
882
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
883 884 885 886 887 888 889 890
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) {
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

      mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
891
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
892 893 894 895 896 897
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
898
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
899 900 901 902 903 904 905 906 907 908 909
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
      break;
    }
  }
}

L
Liu Jicong 已提交
910
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
911 912
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
913

914 915 916
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
917 918 919
    SArray *tmp = pOldConsumer->rebNewTopics;
    pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
    pNewConsumer->rebNewTopics = tmp;
920

921 922 923
    tmp = pOldConsumer->rebRemovedTopics;
    pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
    pNewConsumer->rebRemovedTopics = tmp;
924

925 926 927
    tmp = pOldConsumer->assignedTopics;
    pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
    pNewConsumer->assignedTopics = tmp;
928

929 930
    pOldConsumer->subscribeTime = pNewConsumer->upTime;
    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
931 932 933
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < sz; i++) {
934
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
935
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
936
    }
L
Liu Jicong 已提交
937 938 939

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

H
Haojun Liao 已提交
940
    int32_t status = pOldConsumer->status;
941
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
942 943 944
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
945 946 947
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
948
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
949 950 951 952 953 954
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
955 956
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
957 958 959

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

960
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
961
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
962 963

    // not exist in current topic
964
    bool    existing = false;
H
Haojun Liao 已提交
965 966
    int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < numOfExistedTopics; i++) {
967
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
968
      if (strcmp(topic, pNewTopic) == 0) {
L
Liu Jicong 已提交
969 970
        existing = true;
      }
971 972
    }

973
    removeFromNewTopicList(pOldConsumer, pNewTopic);
974 975

    // add to current topic
L
Liu Jicong 已提交
976
    if (!existing) {
977
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
978
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
M
Minglei Jin 已提交
979
    } else {
980
      taosMemoryFree(pNewTopic);
L
Liu Jicong 已提交
981
    }
H
Haojun Liao 已提交
982

983
    // set status
H
Haojun Liao 已提交
984
    int32_t status = pOldConsumer->status;
985
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
986

H
Haojun Liao 已提交
987
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
988 989
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

990
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
991 992
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
993
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
994 995 996 997
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

998 999 1000 1001
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1002
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // set status
H
Haojun Liao 已提交
1017
    int32_t status = pOldConsumer->status;
1018
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1019 1020

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1021
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1022

1023
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1024
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1025
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1026 1027 1028
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1029 1030 1031
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1032 1033 1034
  return 0;
}

L
Liu Jicong 已提交
1035
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1036 1037
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1038
  if (pConsumer == NULL) {
1039
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1040 1041 1042 1043
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1044
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1045 1046 1047
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1048

S
Shengliang Guan 已提交
1049 1050
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1051 1052 1053 1054 1055 1056
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1057 1058 1059 1060
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1061
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1062
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1063 1064 1065
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1066 1067

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1068
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1069

L
Liu Jicong 已提交
1070 1071 1072 1073 1074 1075 1076
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1077 1078 1079 1080
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1081 1082 1083 1084 1085 1086
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1087
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1088

L
Liu Jicong 已提交
1089 1090
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1091 1092
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1093
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1094
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1095

1096
      // client id
L
Liu Jicong 已提交
1097
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1098 1099
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1100
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1101
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1102 1103

      // status
X
Xiaoyu Wang 已提交
1104 1105
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1106 1107
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1108
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1109
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1110 1111 1112 1113 1114 1115

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1116
        STR_TO_VARSTR(topic, topicName);
1117
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1118
      } else {
1119
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1120 1121 1122
      }

      // end point
L
Liu Jicong 已提交
1123
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1124
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1125 1126 1127

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1128
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1129 1130 1131

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1132
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1133 1134 1135

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1136
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1137 1138 1139

      numOfRows++;
    }
1140

L
Liu Jicong 已提交
1141 1142
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1143 1144

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
      return "rebalancing";
    default:
      return "unknown";
  }
}