mndSubscribe.c 40.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23 24 25 26 27 28 29 30 31
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
33 34
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
35
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
36

L
Liu Jicong 已提交
37 38 39 40 41 42
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52 53
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
54
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
55
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
56
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
57
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
58

S
Shengliang Guan 已提交
59 60
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);

S
Shengliang Guan 已提交
64
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
L
Liu Jicong 已提交
65 66 67 68 69 70 71 72 73 74 75

int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
76
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
77
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
80
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
81 82 83
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
84
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
L
Liu Jicong 已提交
85 86 87 88 89
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
90
  char *key = mndMakeSubscribeKey(cgroup, pTopic->name);
L
Liu Jicong 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  if (key == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }
  strcpy(pSub->key, key);
  free(key);

  if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
110
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
L
Liu Jicong 已提交
111
  SMqMVRebReq req = {
L
Liu Jicong 已提交
112 113 114
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
115 116
  };

L
Liu Jicong 已提交
117
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
127

L
Liu Jicong 已提交
128
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
129
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
130

L
Liu Jicong 已提交
131 132 133 134 135 136
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
137
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
138
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
139 140 141

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
142
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
L
Liu Jicong 已提交
143 144 145
    return -1;
  }

S
Shengliang Guan 已提交
146 147 148
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
149 150 151 152
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
153
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
154 155 156 157 158 159 160 161 162 163 164 165

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
166
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
  void   *buf;
  int32_t tlen;
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
    return -1;
  }

S
Shengliang Guan 已提交
192 193 194
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
#if 0
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) {
  SMnode             *pMnode = pMsg->pMnode;
  uint8_t            *str = pMsg->rpcMsg.pCont;
  SMqCMResetOffsetReq req;

  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
  tDecodeSMqCMResetOffsetReq(&decoder, &req);

  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (pHash == NULL) {
    return -1;
  }

  for (int32_t i = 0; i < req.num; i++) {
    SMqOffset    *pOffset = &req.offsets[i];
    SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
    if (pVgOffset == NULL) {
      pVgOffset = malloc(sizeof(SMqVgOffsets));
      if (pVgOffset == NULL) {
        return -1;
      }
      pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
      taosArrayPush(pVgOffset->offsets, &pOffset);
    }
    taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
  }

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
  if (pTrans == NULL) {
    mError("mq-reset-offset: failed since %s", terrstr());
    return -1;
  }

  return 0;
}
#endif

L
Liu Jicong 已提交
249 250
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pMnode;
L
Liu Jicong 已提交
251
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
252
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
253
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
254
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
255 256 257

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
258
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
259 260 261 262
    return -1;
  }
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);

L
Liu Jicong 已提交
263
  // TODO
L
Liu Jicong 已提交
264
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
265
  mTrace("try to get sub ep, old val: %d", hbStatus);
L
Liu Jicong 已提交
266 267 268 269 270
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
271 272
  strcpy(rsp.cgroup, pReq->cgroup);
  rsp.consumerId = consumerId;
L
Liu Jicong 已提交
273
  rsp.epoch = pConsumer->epoch;
L
Liu Jicong 已提交
274
  if (epoch != rsp.epoch) {
L
Liu Jicong 已提交
275
    mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
L
Liu Jicong 已提交
276
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
277
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
278
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
279
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
280 281 282
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
283
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
284
      // TODO: change to bsearch
S
Shengliang Guan 已提交
285
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
286 287
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
S
Shengliang Guan 已提交
288
          int32_t       vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
L
Liu Jicong 已提交
289
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
290
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
291
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
292
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
293
            char           offsetKey[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
294
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
L
Liu Jicong 已提交
295 296 297 298 299 300 301
            SMqSubVgEp     vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1};
            mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
            SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
            if (pOffsetObj != NULL) {
              vgEp.offset = pOffsetObj->offset;
              mndReleaseOffset(pMnode, pOffsetObj);
            }
L
Liu Jicong 已提交
302 303 304 305 306
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
307
      }
L
Liu Jicong 已提交
308
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
309 310 311
    }
  }
  int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
312
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
313 314 315 316 317 318
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  void *abuf = buf;
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
319
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
320
  mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
321 322 323 324 325
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
326
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
S
Shengliang Guan 已提交
327
  int32_t i = 0;
L
Liu Jicong 已提交
328 329 330 331
  while (key[i] != ':') {
    i++;
  }
  key[i] = 0;
L
Liu Jicong 已提交
332
  *cgroup = strdup(key);
L
Liu Jicong 已提交
333
  key[i] = ':';
L
Liu Jicong 已提交
334
  *topic = strdup(&key[i + 1]);
L
Liu Jicong 已提交
335 336 337
  return 0;
}

L
Liu Jicong 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key));
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
      // TODO
      return NULL;
    }
    taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
  }
  return pRebSub;
}

L
Liu Jicong 已提交
351
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
L
Liu Jicong 已提交
352 353 354 355 356 357 358
  SMnode            *pMnode = pMsg->pMnode;
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
359 360 361
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
362
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
363 364 365 366
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
367
        // get all topics of that topic
S
Shengliang Guan 已提交
368 369
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
          char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char            *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
385 386
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
387 388 389 390 391 392 393 394
        char            *topic = taosArrayGetP(rebSubs, i);
        char            *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
395
      }
L
Liu Jicong 已提交
396 397 398 399 400 401 402 403
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
          char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
          free(topicName);
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
404 405
    }
  }
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
414 415 416 417 418
  return 0;
}

static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
419 420
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);

    mInfo("mq rebalance subscription: %s", pSub->key);

    // remove lost consumer
S
Shengliang Guan 已提交
434
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
435 436 437 438
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

      mInfo("mq remove lost consumer %ld", lostConsumerId);

S
Shengliang Guan 已提交
439
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
440 441 442 443
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
444 445 446 447 448 449 450 451
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
452 453 454
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
455 456 457 458
      int32_t imbalanceVg = vgNum % consumerNum;
      int32_t imbalanceSolved = 0;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
459
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
460
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
461 462
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
463 464 465 466
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
467

L
Liu Jicong 已提交
468 469
        mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
470

L
Liu Jicong 已提交
471
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
472 473 474
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
475
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
476 477
        }

L
Liu Jicong 已提交
478 479 480 481 482
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        int32_t         status = atomic_load_32(&pRebConsumer->status);
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
Liu Jicong 已提交
483 484 485
          if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
            pRebConsumer->epoch++;
          }
L
Liu Jicong 已提交
486 487 488 489 490
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
L
Liu Jicong 已提交
491

L
Liu Jicong 已提交
492
          mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
L
Liu Jicong 已提交
493

L
Liu Jicong 已提交
494 495 496 497 498
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
          mndTransAppendRedolog(pTrans, pConsumerRaw);
        }
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
499 500
      }

L
Liu Jicong 已提交
501
      // assign to vgroup
L
Liu Jicong 已提交
502
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
503
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
504
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
505 506
          int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
507 508 509 510
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
511

L
Liu Jicong 已提交
512 513
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
514 515 516 517
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
518
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
519

520
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
521 522
              char *topic;
              char *cgroup;
523
              mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
L
Liu Jicong 已提交
524
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
525

L
Liu Jicong 已提交
526 527
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
                    pConsumerEp->consumerId);
528 529 530 531 532 533

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
              free(topic);
              free(cgroup);
            } else {
L
Liu Jicong 已提交
534 535
              mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
                    pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
536

537 538
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
            }
L
Liu Jicong 已提交
539 540 541
          }
        }
      }
L
Liu Jicong 已提交
542
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
543 544 545 546 547 548 549 550 551 552

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
553
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
554 555 556 557
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
558
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
559 560 561 562 563
  mndTransDrop(pTrans);
  return 0;
}

#if 0
L
Liu Jicong 已提交
564 565
      for (int32_t j = 0; j < consumerNum; j++) {
        bool            changed = false;
L
Liu Jicong 已提交
566 567 568 569 570 571 572 573 574
        bool            unfished = false;

        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        bool            mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
        ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);

        int32_t maxVg = vgEachConsumer + canUseLeft;
        int32_t minVg = vgEachConsumer + mustUseLeft;

L
Liu Jicong 已提交
575
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
L
Liu Jicong 已提交
576 577 578 579 580 581 582 583 584
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
        if (vgThisConsumerBeforeRb > maxVg) {
          vgThisConsumerAfterRb = maxVg; 
          imbalanceSolved++;
          changed = true;
        } else if (vgThisConsumerBeforeRb < minVg) {
          vgThisConsumerAfterRb = minVg;
          if (mustUseLeft) imbalanceSolved++;
L
Liu Jicong 已提交
585
          changed = true;
L
Liu Jicong 已提交
586 587 588 589 590 591 592
        } else {
          vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
        }

        if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
          while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
            // put into unassigned
L
Liu Jicong 已提交
593 594
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
            ASSERT(pConsumerEp != NULL);
L
Liu Jicong 已提交
595
            ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
596 597
            taosArrayPush(unassignedVgStash, pConsumerEp);
          }
L
Liu Jicong 已提交
598 599

        } else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
600
          // assign from unassigned
L
Liu Jicong 已提交
601
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
602 603
            // if no unassgined, save j
            if (taosArrayGetSize(unassignedVgStash) == 0) {
L
Liu Jicong 已提交
604 605
              taosArrayPush(unassignedConsumerIdx, &j);
              unfished = true;
L
Liu Jicong 已提交
606 607
              break;
            }
L
Liu Jicong 已提交
608
            // assign vg to consumer
L
Liu Jicong 已提交
609 610 611 612 613 614 615 616
            SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
            ASSERT(pConsumerEp != NULL);
            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
            // build msg and persist into trans
          }
        }
L
Liu Jicong 已提交
617 618

        if (changed && !unfished) {
L
Liu Jicong 已提交
619 620
          SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
          pRebConsumer->epoch++;
L
Liu Jicong 已提交
621 622 623 624 625 626
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
L
Liu Jicong 已提交
627
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
628
          mndTransAppendRedolog(pTrans, pConsumerRaw);
L
Liu Jicong 已提交
629 630
          mndReleaseConsumer(pMnode, pRebConsumer);
          // TODO: save history
L
Liu Jicong 已提交
631 632 633
        }
      }

L
Liu Jicong 已提交
634 635 636
      for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        int32_t         consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
L
Liu Jicong 已提交
637
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
L
Liu Jicong 已提交
638 639 640 641 642
        if (canUseLeft) imbalanceSolved++;
        // must use
        int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
        while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
          // assign vg to consumer
L
Liu Jicong 已提交
643 644 645 646 647 648 649
          SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
          ASSERT(pConsumerEp != NULL);
          pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
          pConsumerEp->consumerId = pSubConsumer->consumerId;
          taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
          // build msg and persist into trans
        }
L
Liu Jicong 已提交
650 651 652 653 654 655 656 657
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        pRebConsumer->epoch++;
        atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
        SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        mndTransAppendRedolog(pTrans, pConsumerRaw);
        mndReleaseConsumer(pMnode, pRebConsumer);
        // TODO: save history
L
Liu Jicong 已提交
658
      }
L
Liu Jicong 已提交
659
#endif
L
Liu Jicong 已提交
660 661 662

#if 0
    //update consumer status for the subscribption
S
Shengliang Guan 已提交
663
    for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
L
Liu Jicong 已提交
664 665
      SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
      int64_t        consumerId = pCEp->consumerId;
L
Liu Jicong 已提交
666 667 668 669 670
      if (pCEp->status != -1) {
        int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
        if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
          continue;
        }
L
Liu Jicong 已提交
671
        // put consumer into lostConsumer
L
Liu Jicong 已提交
672 673
        SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
        lostConsumer->qmsg = NULL;
L
Liu Jicong 已提交
674
        // put vg into unassigned
L
Liu Jicong 已提交
675 676 677 678 679
        taosArrayPush(pSub->unassignedVg, pCEp);
        // remove from assigned
        // TODO: swap with last one, reduce size and reset i
        taosArrayRemove(pSub->assigned, i);
        // remove from available consumer
S
Shengliang Guan 已提交
680
        for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
L
Liu Jicong 已提交
681
          if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
L
Liu Jicong 已提交
682 683 684 685 686
            taosArrayRemove(pSub->availConsumer, j);
            break;
          }
          // TODO: acquire consumer, set status to unavail
        }
L
Liu Jicong 已提交
687
#if 0
L
Liu Jicong 已提交
688 689
        SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
        pConsumer->epoch++;
L
Liu Jicong 已提交
690
        printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
L
Liu Jicong 已提交
691 692 693 694
        SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
        sdbWriteNotFree(pMnode->pSdb, pRaw);
        mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
695
#endif
L
Liu Jicong 已提交
696 697
      }
    }
L
Liu Jicong 已提交
698 699 700 701 702 703 704 705
    // no available consumer, skip rebalance
    if (taosArrayGetSize(pSub->availConsumer) == 0) {
      continue;
    }
    taosArrayGet(pSub->availConsumer, 0);
    // rebalance condition1 : have unassigned vg
    // assign vg to a consumer, trying to find the least assigned one
    if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
L
Liu Jicong 已提交
706 707 708 709 710 711
      char *topic = NULL;
      char *cgroup = NULL;
      mndSplitSubscribeKey(pSub->key, &topic, &cgroup);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
L
Liu Jicong 已提交
712
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
713
        int64_t        consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
L
Liu Jicong 已提交
714 715
        pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);

L
Liu Jicong 已提交
716
        SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
717
        pCEp->oldConsumerId = pCEp->consumerId;
L
Liu Jicong 已提交
718 719 720
        pCEp->consumerId = consumerId;
        taosArrayPush(pSub->assigned, pCEp);

L
Liu Jicong 已提交
721
        SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
722
        pConsumer->epoch++;
L
Liu Jicong 已提交
723 724 725
        SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        sdbWrite(pMnode->pSdb, pConsumerRaw);
L
Liu Jicong 已提交
726 727
        mndReleaseConsumer(pMnode, pConsumer);

L
Liu Jicong 已提交
728 729 730
        void* msg;
        int32_t msgLen;
        mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
L
Liu Jicong 已提交
731 732 733

        // persist msg
        STransAction action = {0};
734
        action.epSet = pCEp->epSet;
L
Liu Jicong 已提交
735 736
        action.pCont = msg;
        action.contLen = sizeof(SMsgHead) + msgLen;
L
Liu Jicong 已提交
737 738 739
        action.msgType = TDMT_VND_MQ_SET_CONN;
        mndTransAppendRedoAction(pTrans, &action);

L
Liu Jicong 已提交
740
        // persist data
L
Liu Jicong 已提交
741
        SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
742
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
743 744
        mndTransAppendRedolog(pTrans, pRaw);
      }
L
Liu Jicong 已提交
745

L
Liu Jicong 已提交
746 747 748
      if (mndTransPrepare(pMnode, pTrans) != 0) {
        mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
      }
L
Liu Jicong 已提交
749
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
750
      mndTransDrop(pTrans);
L
Liu Jicong 已提交
751 752
      tfree(topic);
      tfree(cgroup);
L
Liu Jicong 已提交
753
    }
L
Liu Jicong 已提交
754
    // rebalance condition2 : imbalance assignment
L
Liu Jicong 已提交
755 756 757
  }
  return 0;
}
L
Liu Jicong 已提交
758
#endif
L
Liu Jicong 已提交
759

S
Shengliang Guan 已提交
760
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
761 762
  SSdb      *pSdb = pMnode->pSdb;
  SVgObj    *pVgroup = NULL;
L
Liu Jicong 已提交
763
  SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
L
Liu Jicong 已提交
764
  SArray    *pArray = NULL;
L
Liu Jicong 已提交
765 766
  SArray    *inner = taosArrayGet(pDag->pSubplans, 0);
  SSubplan  *plan = taosArrayGetP(inner, 0);
L
Liu Jicong 已提交
767
  SArray    *unassignedVg = pSub->unassignedVg;
L
Liu Jicong 已提交
768

L
Liu Jicong 已提交
769 770 771 772
  void *pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
773 774 775 776
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }
L
Liu Jicong 已提交
777

L
Liu Jicong 已提交
778
    pSub->vgNum++;
L
Liu Jicong 已提交
779 780 781 782 783 784 785 786
    plan->execNode.nodeId = pVgroup->vgId;
    plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);

    if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
      terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
      mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
      return -1;
    }
L
Liu Jicong 已提交
787

L
Liu Jicong 已提交
788 789 790
    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
791
    STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
L
Liu Jicong 已提交
792 793
    consumerEp.epSet = pTaskInfo->addr.epset;
    consumerEp.vgId = pTaskInfo->addr.nodeId;
L
Liu Jicong 已提交
794

L
Liu Jicong 已提交
795 796 797
    ASSERT(consumerEp.vgId == pVgroup->vgId);
    consumerEp.qmsg = strdup(pTaskInfo->msg->msg);
    taosArrayPush(unassignedVg, &consumerEp);
L
Liu Jicong 已提交
798
    // TODO: free taskInfo
L
Liu Jicong 已提交
799
    taosArrayDestroy(pArray);
L
Liu Jicong 已提交
800
  }
801

L
Liu Jicong 已提交
802
  /*qDestroyQueryDag(pDag);*/
803
  return 0;
L
Liu Jicong 已提交
804 805
}

S
Shengliang Guan 已提交
806 807
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
808
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
809 810 811 812
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
813
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
814 815 816 817 818 819 820 821 822 823 824 825 826 827
      .sql = pTopic->sql,
      .logicalPlan = pTopic->logicalPlan,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
828

L
Liu Jicong 已提交
829
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
830

L
Liu Jicong 已提交
831 832
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
833

L
Liu Jicong 已提交
834 835
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
836

S
Shengliang Guan 已提交
837 838
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
839 840 841 842 843
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
844

L
Liu Jicong 已提交
845 846 847 848
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
L
Liu Jicong 已提交
849 850 851 852 853 854 855
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
856
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
857
  void   *buf = NULL;
L
Liu Jicong 已提交
858
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
859
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
860 861 862 863

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

L
Liu Jicong 已提交
864
  buf = malloc(tlen);
L
Liu Jicong 已提交
865
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
866

L
Liu Jicong 已提交
867 868
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
869 870 871 872 873 874 875

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
876 877
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
878
SUB_ENCODE_OVER:
L
Liu Jicong 已提交
879
  tfree(buf);
L
Liu Jicong 已提交
880
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
881 882 883 884 885 886 887 888 889 890 891
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
892
  void *buf = NULL;
L
Liu Jicong 已提交
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
911
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
912
  buf = malloc(tlen + 1);
L
Liu Jicong 已提交
913 914 915 916 917 918 919 920
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
921 922
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
923
SUB_DECODE_OVER:
L
Liu Jicong 已提交
924
  tfree(buf);
L
Liu Jicong 已提交
925
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
941
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
942 943 944 945 946 947 948 949
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
950
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
951 952 953 954
  char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
  if (key == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
955
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
956 957 958 959 960 961
  memcpy(key, cgroup, tlen);
  key[tlen] = ':';
  strcpy(key + tlen + 1, topicName);
  return key;
}

L
Liu Jicong 已提交
962
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
963 964 965 966 967
  SSdb            *pSdb = pMnode->pSdb;
  char            *key = mndMakeSubscribeKey(cgroup, topicName);
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  free(key);
  if (pSub == NULL) {
968
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
969 970 971 972
  }
  return pSub;
}

L
Liu Jicong 已提交
973 974 975 976
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
977
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
978 979 980 981
  }
  return pSub;
}

L
Liu Jicong 已提交
982 983 984 985 986 987 988 989 990 991 992
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pMnode;
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
993
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
994 995

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
996
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
997 998 999 1000

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
1001
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
1002
  bool    createConsumer = false;
L
Liu Jicong 已提交
1003 1004 1005 1006
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
1007 1008
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
1009
  } else {
L
Liu Jicong 已提交
1010
    pConsumer->epoch++;
L
Liu Jicong 已提交
1011
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
1012
  }
L
Liu Jicong 已提交
1013
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
1014 1015 1016 1017 1018

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
1019
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
1020 1021 1022 1023 1024
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
1025
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
1026
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
1027 1028
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
1029 1030
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
1031
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1032 1033
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
1034
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1035 1036
      i++;
    } else {
L
Liu Jicong 已提交
1037
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1038
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1039

S
Shengliang Guan 已提交
1040
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
1057 1058 1059 1060 1061
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
1062 1063
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
1064 1065
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
1066 1067
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
1068 1069
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
L
Liu Jicong 已提交
1070
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
1071
          }
L
Liu Jicong 已提交
1072
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
1073
          break;
L
Liu Jicong 已提交
1074 1075
        }
      }
L
Liu Jicong 已提交
1076 1077
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
1078 1079
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
1080 1081 1082 1083 1084
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
1085
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
1086 1087 1088
        continue;
      }

L
Liu Jicong 已提交
1089 1090
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
1091
      if (pSub == NULL) {
L
Liu Jicong 已提交
1092 1093 1094
        mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
1095 1096

        mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
L
Liu Jicong 已提交
1097
      }
L
Liu Jicong 已提交
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
1110
        if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
1111 1112
          mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName,
                pConsumerEp->consumerId);
1113 1114 1115 1116
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
        }
L
Liu Jicong 已提交
1117
        // to trigger rebalance at once, do not set status active
1118
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
1119
      }
L
Liu Jicong 已提交
1120

L
Liu Jicong 已提交
1121
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
1122
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
1123
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
1124

L
Liu Jicong 已提交
1125 1126
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
1127 1128 1129
    }
  }

L
Liu Jicong 已提交
1130
  if (oldSub) taosArrayDestroyEx(oldSub, free);
L
Liu Jicong 已提交
1131 1132 1133 1134 1135 1136 1137

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
1138
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
1139
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
1140
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1141 1142 1143 1144
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
1145
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1146
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
1147 1148
}

L
Liu Jicong 已提交
1149 1150 1151 1152
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
1153 1154 1155 1156 1157

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}