mndConsumer.c 41.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
80
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

S
Shengliang Guan 已提交
84
void mndRebEnd() {
85
  mndRebCntDec();
S
Shengliang Guan 已提交
86
}
L
Liu Jicong 已提交
87

S
Shengliang Guan 已提交
88 89
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
90
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
91
}
L
Liu Jicong 已提交
92

S
Shengliang Guan 已提交
93
void mndRebCntDec() {
94 95 96 97 98 99 100 101 102 103
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
104
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
105 106 107
      break;
    }
  }
S
Shengliang Guan 已提交
108
}
L
Liu Jicong 已提交
109

S
Shengliang Guan 已提交
110 111 112
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
113
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
114 115 116
  if (pConsumer == NULL) {
    return 0;
  }
117

X
Xiaoyu Wang 已提交
118
  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
L
Liu Jicong 已提交
119 120
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
121 122 123 124 125
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

126 127 128 129 130
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

131
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
H
Haojun Liao 已提交
132 133 134 135 136 137 138 139 140 141 142
  if (pTrans == NULL) {
    goto FAIL;
  }

  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
    goto FAIL;
  }

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    goto FAIL;
  }
143

L
Liu Jicong 已提交
144
  tDeleteSMqConsumerObj(pConsumerNew);
145 146
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
147 148
  return 0;
FAIL:
L
Liu Jicong 已提交
149
  tDeleteSMqConsumerObj(pConsumerNew);
150
  taosMemoryFree(pConsumerNew);
151 152 153 154
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
155 156 157
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
158
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
159 160 161 162
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
163

H
Haojun Liao 已提交
164 165
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
168
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
169
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
170 171 172
    return -1;
  }

L
Liu Jicong 已提交
173 174 175 176 177
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

178
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
179 180 181 182
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
183 184 185
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
186 187
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
188 189 190 191
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
192
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
193 194 195 196
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
  SMqConsumerObj      *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
  if (pConsumer == NULL) {
    return 0;
  }

  mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return 0;
227

L
Liu Jicong 已提交
228 229 230 231 232 233 234
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
235
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
236 237 238 239
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
240 241 242
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
243 244 245
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
246
  }
L
Liu Jicong 已提交
247
  return pRebInfo;
248 249
}

H
Haojun Liao 已提交
250 251 252 253 254 255 256
static void freeRebalanceItem(void* param) {
  SMqRebInfo* pInfo = param;
  taosArrayDestroy(pInfo->lostConsumers);
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
257 258
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
259 260 261 262
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

263
  mDebug("start to process mq timer");
264

265
  // rebalance cannot be parallel
L
Liu Jicong 已提交
266
  if (!mndRebTryStart()) {
267
    mDebug("mq rebalance already in progress, do nothing");
268 269 270 271
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
272
  if (pRebMsg == NULL) {
H
Haojun Liao 已提交
273
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t) sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
274 275 276 277
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

278
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
279 280 281 282 283 284 285 286
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
287 288 289 290

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
291 292 293
    if (pIter == NULL) {
      break;
    }
294 295 296

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
297

X
Xiaoyu Wang 已提交
298 299 300
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
           hbStatus);
L
Liu Jicong 已提交
301 302

    if (status == MQ_CONSUMER_STATUS__READY) {
H
Haojun Liao 已提交
303 304 305 306 307 308 309 310 311
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

        pLostMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
            .pCont = pLostMsg,
            .contLen = sizeof(SMqConsumerLostMsg),
        };
312

H
Haojun Liao 已提交
313 314
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
L
Liu Jicong 已提交
315
    } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
H
Haojun Liao 已提交
316
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
L
Liu Jicong 已提交
317 318 319 320 321 322 323 324 325
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
            .pCont = pClearMsg,
            .contLen = sizeof(SMqConsumerClearMsg),
        };
326

L
Liu Jicong 已提交
327 328
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
329
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
330
      taosRLockLatch(&pConsumer->lock);
331 332 333 334 335
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
336
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
337 338
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
339
      taosRUnLockLatch(&pConsumer->lock);
340
    } else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
L
Liu Jicong 已提交
341
      taosRLockLatch(&pConsumer->lock);
342

343 344 345 346 347
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
348
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
349 350 351 352 353 354 355 356
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
357
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
358 359
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
360
      taosRUnLockLatch(&pConsumer->lock);
361 362 363 364 365 366 367 368 369 370
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
371
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
372 373 374 375 376 377 378
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
379
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
380
    mndRebEnd();
381 382 383 384
  }
  return 0;
}

385
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
386 387
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
388

D
dapan1121 已提交
389 390 391 392 393
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
394
  int64_t         consumerId = req.consumerId;
395
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
396
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
397
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
398 399 400
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
401 402 403 404 405 406

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
407
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
408 409 410
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
411
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
412
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
413 414 415 416
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
417 418 419 420 421 422 423
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
424
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
425 426 427
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
428 429 430 431 432 433

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
434 435
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
436

437
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
438
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
439
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
440 441 442 443
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
444 445 446 447 448 449 450
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
451

452 453 454 455
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
456

L
Liu Jicong 已提交
457
#if 1
L
Liu Jicong 已提交
458
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
X
Xiaoyu Wang 已提交
459
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
460 461 462
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
463
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
464
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
465 466 467
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
468

469
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
470
  }
471
#endif
472 473

  if (status != MQ_CONSUMER_STATUS__READY) {
X
Xiaoyu Wang 已提交
474
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
475 476 477 478 479 480
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

481
  // 2. check epoch, only send ep info when epochs do not match
482 483
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
484 485
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
486 487 488 489 490
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
491
      taosRUnLockLatch(&pConsumer->lock);
492 493 494
      goto FAIL;
    }

H
Haojun Liao 已提交
495
    // handle all topics subscribed by this consumer
496 497 498 499
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
500

501 502 503 504 505 506 507 508
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
509
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
510
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
511 512 513 514
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
515 516 517 518
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
519 520
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
521

H
Haojun Liao 已提交
522
      // this customer assigned vgroups
523 524 525
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
526
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
527 528
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
529 530 531 532
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
533
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
552

553
  // encode rsp
L
Liu Jicong 已提交
554
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
555 556
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
557
    terrno = TSDB_CODE_OUT_OF_MEMORY;
558
    return -1;
L
Liu Jicong 已提交
559
  }
H
Haojun Liao 已提交
560

561 562 563
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
564

565
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
566
  tEncodeSMqAskEpRsp(&abuf, &rsp);
567 568

  // release consumer and free memory
L
Liu Jicong 已提交
569
  tDeleteSMqAskEpRsp(&rsp);
570 571 572
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
573 574
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
575
  return 0;
H
Haojun Liao 已提交
576

577
FAIL:
L
Liu Jicong 已提交
578
  tDeleteSMqAskEpRsp(&rsp);
579 580 581 582
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
583 584 585 586 587 588 589 590
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

591 592 593 594 595 596 597 598
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

620 621 622 623
static void* topicNameDup(void* p){
  return taosStrdup((char*) p);
}

624 625 626 627 628 629 630
static void freeItem(void* param) {
  void* pItem = *(void**)param;
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
}

631 632 633 634
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

635 636
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
637 638

  uint64_t        consumerId = subscribe.consumerId;
639
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
640
  SMqConsumerObj *pExistedConsumer = NULL;
641 642 643
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
644 645
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
646
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
647

648
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
649

H
Haojun Liao 已提交
650
  // check topic existence
651
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
652 653 654
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
655

656 657 658
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
659 660
  }

H
Haojun Liao 已提交
661 662
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
H
Haojun Liao 已提交
663 664
    mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s, numOfTopics:%d", consumerId,
        subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
665

666
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
667
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
668 669

    // set the update type
670
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
671
    taosArrayDestroy(pConsumerNew->assignedTopics);
672 673 674
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
675
    taosArrayDestroy(pConsumerNew->rebNewTopics);
676
    pConsumerNew->rebNewTopics = pTopicList;
677 678
    subscribe.topicNames = NULL;

679 680
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
681 682

  } else {
H
Haojun Liao 已提交
683 684
    /*taosRLockLatch(&pExistedConsumer->lock);*/
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
685

H
Haojun Liao 已提交
686 687
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
688

689 690
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
691
      goto _over;
692 693 694 695
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
696
      goto _over;
697
    }
H
Haojun Liao 已提交
698

699
    // set the update type
700
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
H
Haojun Liao 已提交
701
    taosArrayDestroy(pConsumerNew->assignedTopics);
702
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
703

704
    int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0;
705 706 707 708

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
709
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
710 711 712 713
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
714
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
715 716 717 718
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
719
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
720
        char *newTopic = taosArrayGetP(pTopicList, j);
721
        int   comp = strcmp(oldTopic, newTopic);
722 723 724 725 726
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
727
          char *oldTopicCopy = taosStrdup(oldTopic);
728 729 730 731
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
732
          char *newTopicCopy = taosStrdup(newTopic);
733 734 735 736 737 738 739
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

740 741
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
742
      goto _over;
743 744
    }

745 746
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
747 748
  }

S
Shengliang Guan 已提交
749
  code = TSDB_CODE_ACTION_IN_PROGRESS;
750

751
_over:
L
Liu Jicong 已提交
752 753
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
754 755 756
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
757
  }
H
Haojun Liao 已提交
758

759 760
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
761
    taosMemoryFree(pConsumerNew);
762
  }
763

764
  // TODO: replace with destroy subscribe msg
765
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
766
  return code;
L
Liu Jicong 已提交
767 768
}

L
Liu Jicong 已提交
769 770
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
771 772

  void   *buf = NULL;
L
Liu Jicong 已提交
773
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
774 775 776
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
777
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
778

wafwerar's avatar
wafwerar 已提交
779
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
780 781
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
782
  void *abuf = buf;
L
Liu Jicong 已提交
783 784
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
785
  int32_t dataPos = 0;
L
Liu Jicong 已提交
786 787
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
788 789
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
790

L
Liu Jicong 已提交
791 792
  terrno = TSDB_CODE_SUCCESS;

793
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
794
  taosMemoryFreeClear(buf);
795
  if (terrno != 0) {
796
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
797 798 799 800
    sdbFreeRaw(pRaw);
    return NULL;
  }

801
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
802 803 804
  return pRaw;
}

L
Liu Jicong 已提交
805
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
806 807 808
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
809

L
Liu Jicong 已提交
810
  int8_t sver = 0;
H
Haojun Liao 已提交
811 812 813
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
814 815 816

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
817
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
818 819
  }

820
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
821 822 823
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
824

825
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
826 827 828
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
829 830

  int32_t dataPos = 0;
L
Liu Jicong 已提交
831
  int32_t len;
L
Liu Jicong 已提交
832
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
833
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
834 835 836 837 838
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
839 840
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
841

L
Liu Jicong 已提交
842
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
843
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
844 845
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
846

H
Haojun Liao 已提交
847
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
848

L
Liu Jicong 已提交
849
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
850
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
851
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
852 853
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
854
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
855
  }
L
Liu Jicong 已提交
856 857 858 859

  return pRow;
}

L
Liu Jicong 已提交
860
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
H
Haojun Liao 已提交
861 862 863
  mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
L
Liu Jicong 已提交
864
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
865 866 867
  return 0;
}

L
Liu Jicong 已提交
868
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
869 870
  mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId,
         pConsumer->status, mndConsumerStatusName(pConsumer->status));
871
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
872 873 874
  return 0;
}

875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921
static void updateConsumerStatus(SMqConsumerObj* pConsumer) {
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
    if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
      pConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
      pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
    }
  } else {
    if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
      pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
      pConsumer->status = MQ_CONSUMER_STATUS__LOST;
    }
  }
}

// remove from new topic
static void removeFromNewTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) {
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

      mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
             pTopic, (int) taosArrayGetSize(pConsumer->rebNewTopics));
      break;
    }
  }
}

// remove from removed topic
static void removeFromRemoveTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
      break;
    }
  }
}

L
Liu Jicong 已提交
922
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
X
Xiaoyu Wang 已提交
923 924
  mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
L
Liu Jicong 已提交
925

926 927 928
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
L
Liu Jicong 已提交
929 930
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
931

932
    // this new consumer has identical topics with one existed consumers.
933 934 935 936 937 938 939 940 941 942
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
943

944 945 946
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
947

948 949 950
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
951
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
L
Liu Jicong 已提交
952 953
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
L
Liu Jicong 已提交
954

955
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
956
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
957
    for (int32_t i = 0; i < sz; i++) {
958
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
959
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
960
    }
L
Liu Jicong 已提交
961 962 963

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

H
Haojun Liao 已提交
964
    int32_t status = pOldConsumer->status;
965
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
H
Haojun Liao 已提交
966 967 968
    mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
           pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
L
Liu Jicong 已提交
969
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
L
Liu Jicong 已提交
970 971
    /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
    /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
L
Liu Jicong 已提交
972 973 974

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
975
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
976 977 978 979 980 981
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
982 983
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
984 985 986

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

987
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
988 989
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
990 991

    // not exist in current topic
992
    bool    existing = false;
H
Haojun Liao 已提交
993 994
    int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
    for (int32_t i = 0; i < numOfExistedTopics; i++) {
995
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
996
      if (strcmp(topic, pNewTopic) == 0) {
L
Liu Jicong 已提交
997 998
        existing = true;
      }
999 1000
    }

1001
    removeFromNewTopicList(pOldConsumer, pNewTopic);
1002 1003

    // add to current topic
L
Liu Jicong 已提交
1004
    if (!existing) {
1005
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1006
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
M
Minglei Jin 已提交
1007
    } else {
1008
      taosMemoryFree(pNewTopic);
L
Liu Jicong 已提交
1009
    }
H
Haojun Liao 已提交
1010

1011
    // set status
H
Haojun Liao 已提交
1012
    int32_t status = pOldConsumer->status;
1013
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1014

H
Haojun Liao 已提交
1015
    // the re-balance is triggered when the new consumer is launched.
L
Liu Jicong 已提交
1016 1017
    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

1018
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
1019 1020
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1021
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1022 1023 1024 1025
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

1026
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
L
Liu Jicong 已提交
1027 1028
    /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
    /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
1029
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
L
Liu Jicong 已提交
1030
#if 0
1031 1032
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
L
Liu Jicong 已提交
1033
      A(strcmp(topic, removedTopic) != 0);
1034 1035 1036 1037
    }
#endif

    // remove from removed topic
1038
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // set status
H
Haojun Liao 已提交
1053
    int32_t status = pOldConsumer->status;
1054
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1055 1056

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;
1057
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1058

1059 1060
    mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
               ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1061
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1062 1063 1064
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1065 1066 1067
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1068 1069 1070
  return 0;
}

L
Liu Jicong 已提交
1071
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1072 1073
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1074
  if (pConsumer == NULL) {
1075
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1076 1077 1078 1079
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1080
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1081 1082 1083
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1084

S
Shengliang Guan 已提交
1085 1086
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1093 1094 1095 1096
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1097
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1098
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1099 1100 1101
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1102 1103

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1104
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1105

L
Liu Jicong 已提交
1106 1107 1108 1109 1110 1111 1112
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1113 1114 1115 1116
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1117 1118 1119 1120 1121 1122
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1123
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
L
Liu Jicong 已提交
1124

L
Liu Jicong 已提交
1125 1126
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1127 1128
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1129
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1130
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1131

1132
      // client id
L
Liu Jicong 已提交
1133
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1134 1135
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1136
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1137
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1138 1139 1140

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
1141 1142 1143
      const char* pStatusName = mndConsumerStatusName(pConsumer->status);
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1144
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1145
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1146 1147 1148 1149 1150 1151

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1152
        STR_TO_VARSTR(topic, topicName);
1153
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1154
      } else {
1155
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1156 1157 1158
      }

      // end point
L
Liu Jicong 已提交
1159
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1160
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1161 1162 1163

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1164
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
L
Liu Jicong 已提交
1165 1166 1167

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1168
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1169 1170 1171

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1172
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1173 1174 1175

      numOfRows++;
    }
1176

L
Liu Jicong 已提交
1177 1178
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1179 1180

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}