mndSubscribe.c 31.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndScheduler.h"
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31 32
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
33
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
36
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

L
Liu Jicong 已提交
43
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46 47 48 49 50
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);
static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
L
Liu Jicong 已提交
59

S
Shengliang Guan 已提交
60 61
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
64
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName);
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66 67 68 69 70 71 72 73 74 75
int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
76
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
77
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
80
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
81
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
82 83 84
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
85
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
L
Liu Jicong 已提交
86 87 88 89 90
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
91 92
  char key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, pTopic->name);
L
Liu Jicong 已提交
93 94
  strcpy(pSub->key, key);

L
Liu Jicong 已提交
95 96
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSMqSubscribeObj(pSub);
wafwerar's avatar
wafwerar 已提交
97
    taosMemoryFree(pSub);
L
Liu Jicong 已提交
98 99 100
    return NULL;
  }

L
Liu Jicong 已提交
101 102 103 104
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
105
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
L
Liu Jicong 已提交
106
  SMqMVRebReq req = {
L
Liu Jicong 已提交
107 108 109
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
110 111
  };

L
Liu Jicong 已提交
112
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
113
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
114 115 116 117 118 119 120 121
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
122

L
Liu Jicong 已提交
123
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
124
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
125

L
Liu Jicong 已提交
126 127 128 129 130 131
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
132
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
133
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
134 135 136

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
137
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
L
Liu Jicong 已提交
138 139 140
    return -1;
  }

S
Shengliang Guan 已提交
141 142 143
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
144 145 146 147
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
148
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
149 150 151

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
152
    taosMemoryFree(buf);
L
Liu Jicong 已提交
153 154 155 156 157 158
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
159 160
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) {
  SMqCancelConnReq req = {0};
L
Liu Jicong 已提交
161
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
162 163 164
  req.vgId = pConsumerEp->vgId;
  req.epoch = pConsumerEp->epoch;
  strcpy(req.topicName, oldTopicName);
L
Liu Jicong 已提交
165

L
Liu Jicong 已提交
166
  int32_t tlen = tEncodeSMqCancelConnReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
167
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
168 169 170 171 172 173 174 175 176
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
177
  tEncodeSMqCancelConnReq(&abuf, &req);
L
Liu Jicong 已提交
178 179 180 181 182
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
183
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) {
L
Liu Jicong 已提交
184 185
  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
186
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) {
L
Liu Jicong 已提交
187 188 189
    return -1;
  }

S
Shengliang Guan 已提交
190 191 192
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
193 194 195 196
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
L
Liu Jicong 已提交
197
  action.msgType = TDMT_VND_MQ_CANCEL_CONN;
L
Liu Jicong 已提交
198 199 200

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
201
    taosMemoryFree(buf);
L
Liu Jicong 已提交
202 203 204 205 206 207
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
208 209
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
210
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
211
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
212
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
213
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
214

S
Shengliang Guan 已提交
215
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
L
Liu Jicong 已提交
216
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
217
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
218 219
    return -1;
  }
L
Liu Jicong 已提交
220
  //TODO add lock
L
Liu Jicong 已提交
221
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
L
Liu Jicong 已提交
222
  int32_t           serverEpoch = pConsumer->epoch;
L
Liu Jicong 已提交
223

L
Liu Jicong 已提交
224
  // TODO
L
Liu Jicong 已提交
225
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
L
Liu Jicong 已提交
226
  mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus);
L
Liu Jicong 已提交
227 228 229 230 231
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
232
  strcpy(rsp.cgroup, pReq->cgroup);
L
Liu Jicong 已提交
233 234 235 236 237
  if (epoch != serverEpoch) {
    mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch);
    mDebug("consumer %ld try r lock", consumerId);
    taosRLockLatch(&pConsumer->lock);
    mDebug("consumer %ld r locked", consumerId);
L
Liu Jicong 已提交
238
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
239
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
240
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
241
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
242 243 244
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
245
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
246
      // TODO: change to bsearch
S
Shengliang Guan 已提交
247
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
248 249
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
L
temp  
Liu Jicong 已提交
250
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
L
Liu Jicong 已提交
251
          mInfo("topic %s has %d vg", topicName, serverEpoch);
L
Liu Jicong 已提交
252
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
253
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
254
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
255
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
256
            char           offsetKey[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
257
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
L
Liu Jicong 已提交
258 259 260 261 262
            SMqSubVgEp     vgEp = {
                    .epSet = pConsumerEp->epSet,
                    .vgId = pConsumerEp->vgId,
                    .offset = -1,
            };
L
Liu Jicong 已提交
263 264 265 266 267 268
            mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
            SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
            if (pOffsetObj != NULL) {
              vgEp.offset = pOffsetObj->offset;
              mndReleaseOffset(pMnode, pOffsetObj);
            }
L
Liu Jicong 已提交
269 270 271 272 273
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
274
      }
L
Liu Jicong 已提交
275
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
276
    }
L
Liu Jicong 已提交
277 278
    taosRUnLockLatch(&pConsumer->lock);
    mDebug("consumer %ld r unlock", consumerId);
L
Liu Jicong 已提交
279
  }
L
Liu Jicong 已提交
280
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
281
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
282 283 284 285
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
286
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
L
Liu Jicong 已提交
287
  ((SMqRspHead *)buf)->epoch = serverEpoch;
L
Liu Jicong 已提交
288
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
L
Liu Jicong 已提交
289 290

  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
291
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
292
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
293
  mndReleaseConsumer(pMnode, pConsumer);
S
Shengliang Guan 已提交
294 295
  pMsg->pRsp = buf;
  pMsg->rspLen = tlen;
L
Liu Jicong 已提交
296 297 298
  return 0;
}

L
Liu Jicong 已提交
299
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
S
Shengliang Guan 已提交
300
  int32_t i = 0;
L
Liu Jicong 已提交
301
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
302 303
    i++;
  }
L
Liu Jicong 已提交
304 305 306
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
  strcpy(topic, &key[i + 1]);
L
Liu Jicong 已提交
307 308 309
  return 0;
}

L
Liu Jicong 已提交
310
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
311
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
312 313 314
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
315
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
316 317
      return NULL;
    }
L
Liu Jicong 已提交
318
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
L
Liu Jicong 已提交
319 320 321 322
  }
  return pRebSub;
}

S
Shengliang Guan 已提交
323 324
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
325 326 327 328 329 330
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
331 332 333
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
334
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
335 336 337 338
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
339
        // get all topics of that topic
S
Shengliang Guan 已提交
340 341
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
342 343 344
          char *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
358 359
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
360 361 362
        char *topic = taosArrayGetP(rebSubs, i);
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
363 364 365 366 367 368
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
369
      }
L
Liu Jicong 已提交
370 371 372
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
L
fix  
Liu Jicong 已提交
373
          char *topicName = taosArrayGetP(pConsumer->recentRemovedTopics, i);
wafwerar's avatar
wafwerar 已提交
374
          taosMemoryFree(topicName);
L
Liu Jicong 已提交
375 376 377
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
378 379
    }
  }
L
Liu Jicong 已提交
380 381
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
L
Liu Jicong 已提交
382 383 384 385 386
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_DO_REBALANCE,
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
S
Shengliang Guan 已提交
387
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
L
Liu Jicong 已提交
388 389 390 391
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
392 393 394
  return 0;
}

S
Shengliang Guan 已提交
395 396
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
S
Shengliang Guan 已提交
397 398
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
399 400 401 402 403 404 405 406 407
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
wafwerar's avatar
wafwerar 已提交
408
    taosMemoryFreeClear(pRebSub->key);
L
Liu Jicong 已提交
409

L
Liu Jicong 已提交
410
    mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg));
L
Liu Jicong 已提交
411 412

    // remove lost consumer
S
Shengliang Guan 已提交
413
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
414 415
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

416
      mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId);
L
Liu Jicong 已提交
417

S
Shengliang Guan 已提交
418
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
419 420 421 422
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
423 424 425 426 427 428 429 430
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
431 432 433
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
434 435 436
      int32_t imbalanceVg = vgNum % consumerNum;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
437
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
438
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
439 440
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
441 442 443 444
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
445

446
        mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
L
Liu Jicong 已提交
447
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
448

L
Liu Jicong 已提交
449
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
450 451 452
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
453
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
454 455
        }

L
Liu Jicong 已提交
456
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
L
Liu Jicong 已提交
457 458 459
        mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
        taosWLockLatch(&pRebConsumer->lock);
        mDebug("consumer %ld w locked", pRebConsumer->consumerId);
L
Liu Jicong 已提交
460 461 462 463
        int32_t         status = atomic_load_32(&pRebConsumer->status);
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
fix txn  
Liu Jicong 已提交
464 465 466
          /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
            /*pRebConsumer->epoch++;*/
          /*}*/
L
Liu Jicong 已提交
467
          if (vgThisConsumerAfterRb != 0) {
L
fix txn  
Liu Jicong 已提交
468
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
L
Liu Jicong 已提交
469
          } else {
L
fix txn  
Liu Jicong 已提交
470
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
L
Liu Jicong 已提交
471
          }
L
Liu Jicong 已提交
472

L
fix txn  
Liu Jicong 已提交
473 474
          mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status,
                pRebConsumer->status);
L
Liu Jicong 已提交
475

L
fix txn  
Liu Jicong 已提交
476
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
L
Liu Jicong 已提交
477
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
L
fix txn  
Liu Jicong 已提交
478
          mndTransAppendCommitlog(pTrans, pConsumerRaw);
L
Liu Jicong 已提交
479
        }
L
Liu Jicong 已提交
480 481
        taosWUnLockLatch(&pRebConsumer->lock);
        mDebug("consumer %ld w unlock", pRebConsumer->consumerId);
L
Liu Jicong 已提交
482
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
483 484
      }

L
Liu Jicong 已提交
485
      // assign to vgroup
L
Liu Jicong 已提交
486
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
487
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
488
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
489
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
490 491 492 493
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
494

L
Liu Jicong 已提交
495 496
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
497 498 499 500
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
501 502
            //TODO
            pConsumerEp->epoch = 0;
L
Liu Jicong 已提交
503
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
504

505
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
506 507 508
              char topic[TSDB_TOPIC_FNAME_LEN];
              char cgroup[TSDB_CGROUP_LEN];
              mndSplitSubscribeKey(pSub->key, topic, cgroup);
L
Liu Jicong 已提交
509
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
510

L
Liu Jicong 已提交
511 512
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
                    topic, pConsumerEp->consumerId, cgroup);
513 514 515 516

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
            } else {
L
Liu Jicong 已提交
517 518
              mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
                    pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
519

520 521
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
            }
L
Liu Jicong 已提交
522 523 524
          }
        }
      }
L
Liu Jicong 已提交
525
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
526 527 528

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
L
add log  
Liu Jicong 已提交
529
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
530 531 532 533 534 535
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
536
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
537 538 539 540
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
541
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
542 543 544 545
  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
546 547
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
548
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
549 550 551 552
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
553
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
554 555 556 557 558 559 560 561 562
      .sql = pTopic->sql,
      .logicalPlan = pTopic->logicalPlan,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
563
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
564 565 566 567
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
568

L
Liu Jicong 已提交
569
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
570

L
Liu Jicong 已提交
571 572
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
573

L
Liu Jicong 已提交
574 575
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
576

S
Shengliang Guan 已提交
577 578
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
579 580 581 582 583
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
584

L
Liu Jicong 已提交
585 586
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
587
    taosMemoryFree(buf);
L
Liu Jicong 已提交
588
    return -1;
L
Liu Jicong 已提交
589 590 591 592 593 594 595
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
596
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
597
  void   *buf = NULL;
L
Liu Jicong 已提交
598
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
599
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
600 601 602 603

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
604
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
605
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
606

L
Liu Jicong 已提交
607 608
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
609 610 611 612 613 614 615

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
616 617
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
618
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
619
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
620
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
621 622 623 624 625 626 627 628 629 630 631
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
632
  void *buf = NULL;
L
Liu Jicong 已提交
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
651
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
652
  buf = taosMemoryMalloc(tlen + 1);
L
Liu Jicong 已提交
653 654 655 656 657 658 659 660
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
661 662
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
663
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
664
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
665
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
666
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
667
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
668 669 670 671 672 673 674 675 676 677 678 679 680
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
681
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
682 683 684 685 686 687 688 689
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
690
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
691
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
692
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
693
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
694
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
695
  return 0;
L
Liu Jicong 已提交
696 697
}

L
Liu Jicong 已提交
698
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
699 700 701
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
702 703
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
704
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
705 706 707 708
  }
  return pSub;
}

L
Liu Jicong 已提交
709 710 711 712
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
713
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
714 715 716 717
  }
  return pSub;
}

L
Liu Jicong 已提交
718 719 720 721 722
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

S
Shengliang Guan 已提交
723 724
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
725 726 727 728
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
729
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
730 731

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
732
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
733 734 735 736

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
737
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
738
  bool    createConsumer = false;
L
Liu Jicong 已提交
739 740 741 742
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
743 744
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
745
  } else {
L
Liu Jicong 已提交
746
    pConsumer->epoch++;
L
Liu Jicong 已提交
747
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
748
  }
L
Liu Jicong 已提交
749
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
750 751 752 753 754

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
755
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
756 757 758 759 760
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
761
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
762
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
763 764
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
765 766
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
767
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
768 769
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
770
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
771 772
      i++;
    } else {
L
Liu Jicong 已提交
773
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
774
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
775

S
Shengliang Guan 已提交
776
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
793 794 795 796 797
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
798 799
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
800 801
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
802 803
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
804
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
L
Liu Jicong 已提交
805
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName);
L
Liu Jicong 已提交
806
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
807
          }
L
Liu Jicong 已提交
808
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
809
          break;
L
Liu Jicong 已提交
810 811
        }
      }
L
Liu Jicong 已提交
812 813
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
814 815
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
816 817 818 819 820
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
821
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
822 823 824
        continue;
      }

L
Liu Jicong 已提交
825 826
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
827
      if (pSub == NULL) {
L
Liu Jicong 已提交
828 829
        mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
               newTopicName);
L
Liu Jicong 已提交
830 831
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
832 833

        mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
L
Liu Jicong 已提交
834
      }
L
Liu Jicong 已提交
835 836 837 838 839 840 841 842 843 844 845 846

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
847
        if (pConsumerEp->oldConsumerId == -1) {
848
          mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
L
Liu Jicong 已提交
849
                pConsumerEp->consumerId);
850 851 852 853
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
        }
L
Liu Jicong 已提交
854
        // to trigger rebalance at once, do not set status active
855
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
856
      }
L
Liu Jicong 已提交
857

L
Liu Jicong 已提交
858
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
859
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
860
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
861

L
Liu Jicong 已提交
862 863
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
864 865 866
    }
  }

L
Liu Jicong 已提交
867
  /*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
L
Liu Jicong 已提交
868 869 870 871 872 873 874

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
875
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
876
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
877
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
878 879 880 881
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
882
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
883
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
884 885
}

S
Shengliang Guan 已提交
886
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
L
Liu Jicong 已提交
887 888 889
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
890 891 892 893 894

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}