mndSubscribe.c 39.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
31
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
32 33
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
34
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37 38 39 40 41
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
42

L
Liu Jicong 已提交
43 44 45 46 47 48 49 50 51 52
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
53
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
54
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
55
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
56
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
57

S
Shengliang Guan 已提交
58 59
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
60

L
Liu Jicong 已提交
61 62
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);

S
Shengliang Guan 已提交
63
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
L
Liu Jicong 已提交
64 65 66 67 68 69 70 71 72 73 74

int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
75
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
76
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
77
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
80 81 82
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) {
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name);
  if (key == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }
  strcpy(pSub->key, key);
  free(key);

  if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
109
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
L
Liu Jicong 已提交
110
  SMqMVRebReq req = {
L
Liu Jicong 已提交
111 112 113
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
114 115
  };

L
Liu Jicong 已提交
116
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
L
Liu Jicong 已提交
117 118 119 120 121 122 123 124 125
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
128
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
129

L
Liu Jicong 已提交
130 131 132 133 134 135
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
136
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
137
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
138 139 140

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
141
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
L
Liu Jicong 已提交
142 143 144
    return -1;
  }

S
Shengliang Guan 已提交
145 146 147
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
148 149 150 151
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
152
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
153 154 155 156 157 158 159 160 161 162 163 164

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
165
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
  void   *buf;
  int32_t tlen;
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
    return -1;
  }

S
Shengliang Guan 已提交
191 192 193
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
#if 0
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) {
  SMnode             *pMnode = pMsg->pMnode;
  uint8_t            *str = pMsg->rpcMsg.pCont;
  SMqCMResetOffsetReq req;

  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
  tDecodeSMqCMResetOffsetReq(&decoder, &req);

  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (pHash == NULL) {
    return -1;
  }

  for (int32_t i = 0; i < req.num; i++) {
    SMqOffset    *pOffset = &req.offsets[i];
    SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
    if (pVgOffset == NULL) {
      pVgOffset = malloc(sizeof(SMqVgOffsets));
      if (pVgOffset == NULL) {
        return -1;
      }
      pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
      taosArrayPush(pVgOffset->offsets, &pOffset);
    }
    taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
  }

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
  if (pTrans == NULL) {
    mError("mq-reset-offset: failed since %s", terrstr());
    return -1;
  }

  return 0;
}
#endif

L
Liu Jicong 已提交
248 249
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pMnode;
L
Liu Jicong 已提交
250
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
251
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
252
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
253
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
254 255 256

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
257
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
258 259 260 261
    return -1;
  }
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);

L
Liu Jicong 已提交
262
  // TODO
L
Liu Jicong 已提交
263
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
264
  mTrace("try to get sub ep, old val: %d", hbStatus);
L
Liu Jicong 已提交
265 266 267 268 269
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
270 271
  strcpy(rsp.cgroup, pReq->cgroup);
  rsp.consumerId = consumerId;
L
Liu Jicong 已提交
272
  rsp.epoch = pConsumer->epoch;
L
Liu Jicong 已提交
273
  if (epoch != rsp.epoch) {
L
Liu Jicong 已提交
274
    mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
L
Liu Jicong 已提交
275
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
276
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
277
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
278
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
279 280 281
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
282
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
283
      // TODO: change to bsearch
S
Shengliang Guan 已提交
284
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
285 286
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
S
Shengliang Guan 已提交
287
          int32_t       vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
L
Liu Jicong 已提交
288
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
289
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
290
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
291
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
292 293 294 295 296 297 298 299
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);

            SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId};
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
300
      }
L
Liu Jicong 已提交
301
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
302 303 304
    }
  }
  int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
305
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
306 307 308 309 310 311
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  void *abuf = buf;
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
312
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
313
  mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
314 315 316 317 318
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
319
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
S
Shengliang Guan 已提交
320
  int32_t i = 0;
L
Liu Jicong 已提交
321 322 323 324
  while (key[i] != ':') {
    i++;
  }
  key[i] = 0;
L
Liu Jicong 已提交
325
  *cgroup = strdup(key);
L
Liu Jicong 已提交
326
  key[i] = ':';
L
Liu Jicong 已提交
327
  *topic = strdup(&key[i + 1]);
L
Liu Jicong 已提交
328 329 330
  return 0;
}

L
Liu Jicong 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key));
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
      // TODO
      return NULL;
    }
    taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
  }
  return pRebSub;
}

L
Liu Jicong 已提交
344
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
L
Liu Jicong 已提交
345 346 347 348 349 350 351
  SMnode            *pMnode = pMsg->pMnode;
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
352 353 354
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
355
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
356 357 358 359
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
360
        // get all topics of that topic
S
Shengliang Guan 已提交
361 362
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
          char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char            *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
378 379
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
380 381 382 383 384 385 386 387
        char            *topic = taosArrayGetP(rebSubs, i);
        char            *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
388
      }
L
Liu Jicong 已提交
389 390 391 392 393 394 395 396
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
          char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
          free(topicName);
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
397 398
    }
  }
L
Liu Jicong 已提交
399 400 401 402 403 404 405 406
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
407 408 409 410 411
  return 0;
}

static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
412 413
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);

    mInfo("mq rebalance subscription: %s", pSub->key);

    // remove lost consumer
S
Shengliang Guan 已提交
427
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
428 429 430 431
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

      mInfo("mq remove lost consumer %ld", lostConsumerId);

S
Shengliang Guan 已提交
432
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
433 434 435 436
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
437 438 439 440 441 442 443 444
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
445 446 447
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
448 449 450 451
      int32_t imbalanceVg = vgNum % consumerNum;
      int32_t imbalanceSolved = 0;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
452
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
453
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
454 455
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
456 457 458 459
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
460

L
Liu Jicong 已提交
461 462
        mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
463

L
Liu Jicong 已提交
464
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
465 466 467
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
468
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
469 470
        }

L
Liu Jicong 已提交
471 472 473 474 475
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        int32_t         status = atomic_load_32(&pRebConsumer->status);
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
Liu Jicong 已提交
476 477 478
          if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
            pRebConsumer->epoch++;
          }
L
Liu Jicong 已提交
479 480 481 482 483
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
L
Liu Jicong 已提交
484

L
Liu Jicong 已提交
485
          mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
L
Liu Jicong 已提交
486

L
Liu Jicong 已提交
487 488 489 490 491
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
          mndTransAppendRedolog(pTrans, pConsumerRaw);
        }
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
492 493
      }

L
Liu Jicong 已提交
494
      // assign to vgroup
L
Liu Jicong 已提交
495
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
496
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
497
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
498 499
          int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
500 501 502 503
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505 506
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
507 508 509 510
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
511
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
512

513
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
514 515
              char *topic;
              char *cgroup;
516
              mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
L
Liu Jicong 已提交
517
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
518

L
Liu Jicong 已提交
519 520
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
                    pConsumerEp->consumerId);
521 522 523 524 525 526

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
              free(topic);
              free(cgroup);
            } else {
L
Liu Jicong 已提交
527 528
              mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
                    pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
529

530 531
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
            }
L
Liu Jicong 已提交
532 533 534
          }
        }
      }
L
Liu Jicong 已提交
535
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
536 537 538 539 540 541 542 543 544 545

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
546
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
547 548 549 550
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
551
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
552 553 554 555 556
  mndTransDrop(pTrans);
  return 0;
}

#if 0
L
Liu Jicong 已提交
557 558
      for (int32_t j = 0; j < consumerNum; j++) {
        bool            changed = false;
L
Liu Jicong 已提交
559 560 561 562 563 564 565 566 567
        bool            unfished = false;

        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        bool            mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
        ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);

        int32_t maxVg = vgEachConsumer + canUseLeft;
        int32_t minVg = vgEachConsumer + mustUseLeft;

L
Liu Jicong 已提交
568
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
L
Liu Jicong 已提交
569 570 571 572 573 574 575 576 577
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
        if (vgThisConsumerBeforeRb > maxVg) {
          vgThisConsumerAfterRb = maxVg; 
          imbalanceSolved++;
          changed = true;
        } else if (vgThisConsumerBeforeRb < minVg) {
          vgThisConsumerAfterRb = minVg;
          if (mustUseLeft) imbalanceSolved++;
L
Liu Jicong 已提交
578
          changed = true;
L
Liu Jicong 已提交
579 580 581 582 583 584 585
        } else {
          vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
        }

        if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
          while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
            // put into unassigned
L
Liu Jicong 已提交
586 587
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
            ASSERT(pConsumerEp != NULL);
L
Liu Jicong 已提交
588
            ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
589 590
            taosArrayPush(unassignedVgStash, pConsumerEp);
          }
L
Liu Jicong 已提交
591 592

        } else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
593
          // assign from unassigned
L
Liu Jicong 已提交
594
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
595 596
            // if no unassgined, save j
            if (taosArrayGetSize(unassignedVgStash) == 0) {
L
Liu Jicong 已提交
597 598
              taosArrayPush(unassignedConsumerIdx, &j);
              unfished = true;
L
Liu Jicong 已提交
599 600
              break;
            }
L
Liu Jicong 已提交
601
            // assign vg to consumer
L
Liu Jicong 已提交
602 603 604 605 606 607 608 609
            SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
            ASSERT(pConsumerEp != NULL);
            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
            // build msg and persist into trans
          }
        }
L
Liu Jicong 已提交
610 611

        if (changed && !unfished) {
L
Liu Jicong 已提交
612 613
          SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
          pRebConsumer->epoch++;
L
Liu Jicong 已提交
614 615 616 617 618 619
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
L
Liu Jicong 已提交
620
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
621
          mndTransAppendRedolog(pTrans, pConsumerRaw);
L
Liu Jicong 已提交
622 623
          mndReleaseConsumer(pMnode, pRebConsumer);
          // TODO: save history
L
Liu Jicong 已提交
624 625 626
        }
      }

L
Liu Jicong 已提交
627 628 629
      for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        int32_t         consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
L
Liu Jicong 已提交
630
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
L
Liu Jicong 已提交
631 632 633 634 635
        if (canUseLeft) imbalanceSolved++;
        // must use
        int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
        while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
          // assign vg to consumer
L
Liu Jicong 已提交
636 637 638 639 640 641 642
          SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
          ASSERT(pConsumerEp != NULL);
          pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
          pConsumerEp->consumerId = pSubConsumer->consumerId;
          taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
          // build msg and persist into trans
        }
L
Liu Jicong 已提交
643 644 645 646 647 648 649 650
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        pRebConsumer->epoch++;
        atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
        SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        mndTransAppendRedolog(pTrans, pConsumerRaw);
        mndReleaseConsumer(pMnode, pRebConsumer);
        // TODO: save history
L
Liu Jicong 已提交
651
      }
L
Liu Jicong 已提交
652
#endif
L
Liu Jicong 已提交
653 654 655

#if 0
    //update consumer status for the subscribption
S
Shengliang Guan 已提交
656
    for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
L
Liu Jicong 已提交
657 658
      SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
      int64_t        consumerId = pCEp->consumerId;
L
Liu Jicong 已提交
659 660 661 662 663
      if (pCEp->status != -1) {
        int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
        if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
          continue;
        }
L
Liu Jicong 已提交
664
        // put consumer into lostConsumer
L
Liu Jicong 已提交
665 666
        SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
        lostConsumer->qmsg = NULL;
L
Liu Jicong 已提交
667
        // put vg into unassigned
L
Liu Jicong 已提交
668 669 670 671 672
        taosArrayPush(pSub->unassignedVg, pCEp);
        // remove from assigned
        // TODO: swap with last one, reduce size and reset i
        taosArrayRemove(pSub->assigned, i);
        // remove from available consumer
S
Shengliang Guan 已提交
673
        for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
L
Liu Jicong 已提交
674
          if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
L
Liu Jicong 已提交
675 676 677 678 679
            taosArrayRemove(pSub->availConsumer, j);
            break;
          }
          // TODO: acquire consumer, set status to unavail
        }
L
Liu Jicong 已提交
680
#if 0
L
Liu Jicong 已提交
681 682
        SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
        pConsumer->epoch++;
L
Liu Jicong 已提交
683
        printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
L
Liu Jicong 已提交
684 685 686 687
        SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
        sdbWriteNotFree(pMnode->pSdb, pRaw);
        mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
688
#endif
L
Liu Jicong 已提交
689 690
      }
    }
L
Liu Jicong 已提交
691 692 693 694 695 696 697 698
    // no available consumer, skip rebalance
    if (taosArrayGetSize(pSub->availConsumer) == 0) {
      continue;
    }
    taosArrayGet(pSub->availConsumer, 0);
    // rebalance condition1 : have unassigned vg
    // assign vg to a consumer, trying to find the least assigned one
    if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
L
Liu Jicong 已提交
699 700 701 702 703 704
      char *topic = NULL;
      char *cgroup = NULL;
      mndSplitSubscribeKey(pSub->key, &topic, &cgroup);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
L
Liu Jicong 已提交
705
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
706
        int64_t        consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
L
Liu Jicong 已提交
707 708
        pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);

L
Liu Jicong 已提交
709
        SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
710
        pCEp->oldConsumerId = pCEp->consumerId;
L
Liu Jicong 已提交
711 712 713
        pCEp->consumerId = consumerId;
        taosArrayPush(pSub->assigned, pCEp);

L
Liu Jicong 已提交
714
        SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
715
        pConsumer->epoch++;
L
Liu Jicong 已提交
716 717 718
        SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        sdbWrite(pMnode->pSdb, pConsumerRaw);
L
Liu Jicong 已提交
719 720
        mndReleaseConsumer(pMnode, pConsumer);

L
Liu Jicong 已提交
721 722 723
        void* msg;
        int32_t msgLen;
        mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
L
Liu Jicong 已提交
724 725 726

        // persist msg
        STransAction action = {0};
727
        action.epSet = pCEp->epSet;
L
Liu Jicong 已提交
728 729
        action.pCont = msg;
        action.contLen = sizeof(SMsgHead) + msgLen;
L
Liu Jicong 已提交
730 731 732
        action.msgType = TDMT_VND_MQ_SET_CONN;
        mndTransAppendRedoAction(pTrans, &action);

L
Liu Jicong 已提交
733
        // persist data
L
Liu Jicong 已提交
734
        SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
735
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
736 737
        mndTransAppendRedolog(pTrans, pRaw);
      }
L
Liu Jicong 已提交
738

L
Liu Jicong 已提交
739 740 741
      if (mndTransPrepare(pMnode, pTrans) != 0) {
        mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
      }
L
Liu Jicong 已提交
742
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
743
      mndTransDrop(pTrans);
L
Liu Jicong 已提交
744 745
      tfree(topic);
      tfree(cgroup);
L
Liu Jicong 已提交
746
    }
L
Liu Jicong 已提交
747
    // rebalance condition2 : imbalance assignment
L
Liu Jicong 已提交
748 749 750
  }
  return 0;
}
L
Liu Jicong 已提交
751
#endif
L
Liu Jicong 已提交
752

S
Shengliang Guan 已提交
753
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
754 755
  SSdb      *pSdb = pMnode->pSdb;
  SVgObj    *pVgroup = NULL;
L
Liu Jicong 已提交
756
  SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
L
Liu Jicong 已提交
757
  SArray    *pArray = NULL;
L
Liu Jicong 已提交
758 759
  SArray    *inner = taosArrayGet(pDag->pSubplans, 0);
  SSubplan  *plan = taosArrayGetP(inner, 0);
L
Liu Jicong 已提交
760
  SArray    *unassignedVg = pSub->unassignedVg;
L
Liu Jicong 已提交
761

L
Liu Jicong 已提交
762 763 764 765
  void *pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
766 767 768 769
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }
L
Liu Jicong 已提交
770

L
Liu Jicong 已提交
771
    pSub->vgNum++;
L
Liu Jicong 已提交
772 773 774 775 776 777 778 779
    plan->execNode.nodeId = pVgroup->vgId;
    plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);

    if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
      terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
      mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
      return -1;
    }
L
Liu Jicong 已提交
780

L
Liu Jicong 已提交
781 782 783
    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
784
    STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
L
Liu Jicong 已提交
785 786
    consumerEp.epSet = pTaskInfo->addr.epset;
    consumerEp.vgId = pTaskInfo->addr.nodeId;
L
Liu Jicong 已提交
787

L
Liu Jicong 已提交
788 789 790
    ASSERT(consumerEp.vgId == pVgroup->vgId);
    consumerEp.qmsg = strdup(pTaskInfo->msg->msg);
    taosArrayPush(unassignedVg, &consumerEp);
L
Liu Jicong 已提交
791
    // TODO: free taskInfo
L
Liu Jicong 已提交
792
    taosArrayDestroy(pArray);
L
Liu Jicong 已提交
793
  }
794

L
Liu Jicong 已提交
795
  /*qDestroyQueryDag(pDag);*/
796
  return 0;
L
Liu Jicong 已提交
797 798
}

S
Shengliang Guan 已提交
799 800
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
801
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
802 803 804 805
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
806
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820
      .sql = pTopic->sql,
      .logicalPlan = pTopic->logicalPlan,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
821

L
Liu Jicong 已提交
822
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
823

L
Liu Jicong 已提交
824 825
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
826

L
Liu Jicong 已提交
827 828
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
829

S
Shengliang Guan 已提交
830 831
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
832 833 834 835 836
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
837

L
Liu Jicong 已提交
838 839 840 841
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
L
Liu Jicong 已提交
842 843 844 845 846 847 848
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
849
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
850
  void   *buf = NULL;
L
Liu Jicong 已提交
851
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
852
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
853 854 855 856

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

L
Liu Jicong 已提交
857
  buf = malloc(tlen);
L
Liu Jicong 已提交
858
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
859

L
Liu Jicong 已提交
860 861
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
862 863 864 865 866 867 868

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
869 870
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
871
SUB_ENCODE_OVER:
L
Liu Jicong 已提交
872
  tfree(buf);
L
Liu Jicong 已提交
873 874 875 876 877 878 879 880 881 882 883 884
  if (terrno != 0) {
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
885
  void *buf = NULL;
L
Liu Jicong 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
904
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
905
  buf = malloc(tlen + 1);
L
Liu Jicong 已提交
906 907 908 909 910 911 912 913
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
914 915
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
916
SUB_DECODE_OVER:
L
Liu Jicong 已提交
917
  tfree(buf);
L
Liu Jicong 已提交
918
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
934
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
935 936 937 938 939 940 941 942
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
943
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
944 945 946 947
  char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
  if (key == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
948
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
949 950 951 952 953 954
  memcpy(key, cgroup, tlen);
  key[tlen] = ':';
  strcpy(key + tlen + 1, topicName);
  return key;
}

L
Liu Jicong 已提交
955
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
956 957 958 959 960
  SSdb            *pSdb = pMnode->pSdb;
  char            *key = mndMakeSubscribeKey(cgroup, topicName);
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  free(key);
  if (pSub == NULL) {
961
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
962 963 964 965
  }
  return pSub;
}

L
Liu Jicong 已提交
966 967 968 969
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
970
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
971 972 973 974
  }
  return pSub;
}

L
Liu Jicong 已提交
975 976 977 978 979 980 981 982 983 984 985
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pMnode;
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
986
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
987 988

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
989
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
990 991 992 993

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
994
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
995
  bool    createConsumer = false;
L
Liu Jicong 已提交
996 997 998 999
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
1000 1001
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
1002
  } else {
L
Liu Jicong 已提交
1003
    pConsumer->epoch++;
L
Liu Jicong 已提交
1004
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
1005
  }
L
Liu Jicong 已提交
1006
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
1007 1008 1009 1010 1011

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
1012
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
1013 1014 1015 1016 1017
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
1018
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
1019
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
1020 1021
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
1022 1023
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
1024
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1025 1026
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
1027
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1028 1029
      i++;
    } else {
L
Liu Jicong 已提交
1030
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1031
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1032

S
Shengliang Guan 已提交
1033
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
1050 1051 1052 1053 1054
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
1055 1056
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
1057 1058
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
1059 1060
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
1061 1062
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
L
Liu Jicong 已提交
1063
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
1064
          }
L
Liu Jicong 已提交
1065
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
1066
          break;
L
Liu Jicong 已提交
1067 1068
        }
      }
L
Liu Jicong 已提交
1069 1070
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
1071 1072
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
1073 1074 1075 1076 1077
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
1078
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
1079 1080 1081
        continue;
      }

L
Liu Jicong 已提交
1082 1083
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
1084
      if (pSub == NULL) {
L
Liu Jicong 已提交
1085 1086 1087
        mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
1088
      }
L
Liu Jicong 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
1101 1102 1103 1104 1105
        if (pConsumerEp->oldConsumerId == -1) {
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
        }
L
Liu Jicong 已提交
1106
        // to trigger rebalance at once, do not set status active
1107
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
1108
      }
L
Liu Jicong 已提交
1109

L
Liu Jicong 已提交
1110
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
1111
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
1112
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
1113

L
Liu Jicong 已提交
1114 1115
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
1116 1117 1118
    }
  }

L
Liu Jicong 已提交
1119
  if (oldSub) taosArrayDestroyEx(oldSub, free);
L
Liu Jicong 已提交
1120 1121 1122 1123 1124 1125 1126

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
1127
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
1128
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
1129
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1130 1131 1132 1133
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
1134
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1135
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
1136 1137
}

L
Liu Jicong 已提交
1138 1139 1140 1141
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
1142 1143 1144 1145 1146

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}