mndSubscribe.c 40.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndScheduler.h"
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31 32
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
33
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
36
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

L
Liu Jicong 已提交
43
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46 47 48 49 50 51 52 53 54
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
55
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
56
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
57
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
58
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
59

S
Shengliang Guan 已提交
60 61
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);

S
Shengliang Guan 已提交
65
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
L
Liu Jicong 已提交
66 67 68 69 70 71 72 73 74 75 76

int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
77
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
80
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
81
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
82 83 84
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
85
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
L
Liu Jicong 已提交
86 87 88 89 90
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
91 92
  char key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, pTopic->name);
L
Liu Jicong 已提交
93 94
  strcpy(pSub->key, key);

L
Liu Jicong 已提交
95 96 97 98 99 100 101 102
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }

#if 0
L
Liu Jicong 已提交
103 104 105 106 107 108
  if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tDeleteSMqSubscribeObj(pSub);
    free(pSub);
    return NULL;
  }
L
Liu Jicong 已提交
109
#endif
L
Liu Jicong 已提交
110 111 112 113
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
114
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
L
Liu Jicong 已提交
115
  SMqMVRebReq req = {
L
Liu Jicong 已提交
116 117 118
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
119 120
  };

L
Liu Jicong 已提交
121
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129 130
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
131

L
Liu Jicong 已提交
132
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
133
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135 136 137 138 139 140
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
141
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
142
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
143 144 145

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
146
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
L
Liu Jicong 已提交
147 148 149
    return -1;
  }

S
Shengliang Guan 已提交
150 151 152
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
153 154 155 156
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
157
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
158 159 160 161 162 163 164 165 166 167 168 169

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
170
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
  void   *buf;
  int32_t tlen;
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
    return -1;
  }

S
Shengliang Guan 已提交
196 197 198
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
#if 0
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) {
  SMnode             *pMnode = pMsg->pMnode;
  uint8_t            *str = pMsg->rpcMsg.pCont;
  SMqCMResetOffsetReq req;

  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
  tDecodeSMqCMResetOffsetReq(&decoder, &req);

  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (pHash == NULL) {
    return -1;
  }

  for (int32_t i = 0; i < req.num; i++) {
    SMqOffset    *pOffset = &req.offsets[i];
    SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
    if (pVgOffset == NULL) {
      pVgOffset = malloc(sizeof(SMqVgOffsets));
      if (pVgOffset == NULL) {
        return -1;
      }
      pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
      taosArrayPush(pVgOffset->offsets, &pOffset);
    }
    taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
  }

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
  if (pTrans == NULL) {
    mError("mq-reset-offset: failed since %s", terrstr());
    return -1;
  }

  return 0;
}
#endif

L
Liu Jicong 已提交
253 254
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pMnode;
L
Liu Jicong 已提交
255
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
256
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
257
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
258
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
259 260 261

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
262
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
263 264 265 266
    return -1;
  }
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);

L
Liu Jicong 已提交
267
  // TODO
L
Liu Jicong 已提交
268
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
269
  mTrace("try to get sub ep, old val: %d", hbStatus);
L
Liu Jicong 已提交
270 271 272 273 274
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
275 276
  strcpy(rsp.cgroup, pReq->cgroup);
  rsp.consumerId = consumerId;
L
Liu Jicong 已提交
277 278
  if (epoch != pConsumer->epoch) {
    mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
L
Liu Jicong 已提交
279
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
280
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
281
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
282
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
283 284 285
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
286
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
287
      // TODO: change to bsearch
S
Shengliang Guan 已提交
288
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
289 290
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
S
Shengliang Guan 已提交
291
          int32_t       vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
L
Liu Jicong 已提交
292
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
293
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
294
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
295
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
296
            char           offsetKey[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
297
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
L
Liu Jicong 已提交
298 299 300 301 302 303 304
            SMqSubVgEp     vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1};
            mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
            SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
            if (pOffsetObj != NULL) {
              vgEp.offset = pOffsetObj->offset;
              mndReleaseOffset(pMnode, pOffsetObj);
            }
L
Liu Jicong 已提交
305 306 307 308 309
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
310
      }
L
Liu Jicong 已提交
311
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
312 313
    }
  }
L
Liu Jicong 已提交
314
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
315
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
316 317 318 319
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
320 321 322 323
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = pConsumer->epoch;

  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
324
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
325
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
326
  mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
327 328 329 330 331
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
332
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
S
Shengliang Guan 已提交
333
  int32_t i = 0;
L
Liu Jicong 已提交
334
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
335 336
    i++;
  }
L
Liu Jicong 已提交
337 338 339
  memcpy(topic, key, i - 1);
  topic[i] = 0;
  strcpy(cgroup, &key[i + 1]);
L
Liu Jicong 已提交
340 341 342
  return 0;
}

L
Liu Jicong 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key));
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
      // TODO
      return NULL;
    }
    taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
  }
  return pRebSub;
}

L
Liu Jicong 已提交
356
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
L
Liu Jicong 已提交
357 358 359 360 361 362 363
  SMnode            *pMnode = pMsg->pMnode;
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
364 365 366
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
367
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
368 369 370 371
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
372
        // get all topics of that topic
S
Shengliang Guan 已提交
373 374
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
375 376 377
          char *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
391 392
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
393 394 395
        char *topic = taosArrayGetP(rebSubs, i);
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
396 397 398 399 400 401
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
402
      }
L
Liu Jicong 已提交
403 404 405 406 407 408 409 410
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
          char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
          free(topicName);
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
411 412
    }
  }
L
Liu Jicong 已提交
413 414 415 416 417 418 419 420
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
421 422 423 424 425
  return 0;
}

static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
426 427
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);

    mInfo("mq rebalance subscription: %s", pSub->key);

    // remove lost consumer
S
Shengliang Guan 已提交
441
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
442 443 444 445
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

      mInfo("mq remove lost consumer %ld", lostConsumerId);

S
Shengliang Guan 已提交
446
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
447 448 449 450
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
451 452 453 454 455 456 457 458
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
459 460 461
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
462 463 464 465
      int32_t imbalanceVg = vgNum % consumerNum;
      int32_t imbalanceSolved = 0;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
466
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
467
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
468 469
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
470 471 472 473
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
474

L
Liu Jicong 已提交
475 476
        mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
479 480 481
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
482
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
483 484
        }

L
Liu Jicong 已提交
485 486 487 488 489
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        int32_t         status = atomic_load_32(&pRebConsumer->status);
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
Liu Jicong 已提交
490 491 492
          if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
            pRebConsumer->epoch++;
          }
L
Liu Jicong 已提交
493 494 495 496 497
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
L
Liu Jicong 已提交
498

L
Liu Jicong 已提交
499
          mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
L
Liu Jicong 已提交
500

L
Liu Jicong 已提交
501 502 503 504 505
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
          mndTransAppendRedolog(pTrans, pConsumerRaw);
        }
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
506 507
      }

L
Liu Jicong 已提交
508
      // assign to vgroup
L
Liu Jicong 已提交
509
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
510
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
511
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
512 513
          int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
514 515 516 517
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
518

L
Liu Jicong 已提交
519 520
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
521 522 523 524
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
525
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
526

527
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
528 529 530
              char topic[TSDB_TOPIC_FNAME_LEN];
              char cgroup[TSDB_CGROUP_LEN];
              mndSplitSubscribeKey(pSub->key, topic, cgroup);
L
Liu Jicong 已提交
531
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
532

L
Liu Jicong 已提交
533 534
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
                    pConsumerEp->consumerId);
535 536 537 538

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
            } else {
L
Liu Jicong 已提交
539 540
              mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
                    pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
541

542 543
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
            }
L
Liu Jicong 已提交
544 545 546
          }
        }
      }
L
Liu Jicong 已提交
547
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
548 549 550 551 552 553 554 555 556 557

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
558
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
559 560 561 562
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
563
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
564 565 566 567 568
  mndTransDrop(pTrans);
  return 0;
}

#if 0
L
Liu Jicong 已提交
569 570
      for (int32_t j = 0; j < consumerNum; j++) {
        bool            changed = false;
L
Liu Jicong 已提交
571 572 573 574 575 576 577 578 579
        bool            unfished = false;

        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        bool            mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
        ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);

        int32_t maxVg = vgEachConsumer + canUseLeft;
        int32_t minVg = vgEachConsumer + mustUseLeft;

L
Liu Jicong 已提交
580
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
L
Liu Jicong 已提交
581 582 583 584 585 586 587 588 589
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
        if (vgThisConsumerBeforeRb > maxVg) {
          vgThisConsumerAfterRb = maxVg; 
          imbalanceSolved++;
          changed = true;
        } else if (vgThisConsumerBeforeRb < minVg) {
          vgThisConsumerAfterRb = minVg;
          if (mustUseLeft) imbalanceSolved++;
L
Liu Jicong 已提交
590
          changed = true;
L
Liu Jicong 已提交
591 592 593 594 595 596 597
        } else {
          vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
        }

        if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
          while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
            // put into unassigned
L
Liu Jicong 已提交
598 599
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
            ASSERT(pConsumerEp != NULL);
L
Liu Jicong 已提交
600
            ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
601 602
            taosArrayPush(unassignedVgStash, pConsumerEp);
          }
L
Liu Jicong 已提交
603 604

        } else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
605
          // assign from unassigned
L
Liu Jicong 已提交
606
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
607 608
            // if no unassgined, save j
            if (taosArrayGetSize(unassignedVgStash) == 0) {
L
Liu Jicong 已提交
609 610
              taosArrayPush(unassignedConsumerIdx, &j);
              unfished = true;
L
Liu Jicong 已提交
611 612
              break;
            }
L
Liu Jicong 已提交
613
            // assign vg to consumer
L
Liu Jicong 已提交
614 615 616 617 618 619 620 621
            SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
            ASSERT(pConsumerEp != NULL);
            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
            // build msg and persist into trans
          }
        }
L
Liu Jicong 已提交
622 623

        if (changed && !unfished) {
L
Liu Jicong 已提交
624 625
          SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
          pRebConsumer->epoch++;
L
Liu Jicong 已提交
626 627 628 629 630 631
          if (vgThisConsumerAfterRb != 0) {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
          } else {
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
          }
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
L
Liu Jicong 已提交
632
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
633
          mndTransAppendRedolog(pTrans, pConsumerRaw);
L
Liu Jicong 已提交
634 635
          mndReleaseConsumer(pMnode, pRebConsumer);
          // TODO: save history
L
Liu Jicong 已提交
636 637 638
        }
      }

L
Liu Jicong 已提交
639 640 641
      for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
        bool            canUseLeft = imbalanceSolved < imbalanceVg;
        int32_t         consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
L
Liu Jicong 已提交
642
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
L
Liu Jicong 已提交
643 644 645 646 647
        if (canUseLeft) imbalanceSolved++;
        // must use
        int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
        while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
          // assign vg to consumer
L
Liu Jicong 已提交
648 649 650 651 652 653 654
          SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
          ASSERT(pConsumerEp != NULL);
          pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
          pConsumerEp->consumerId = pSubConsumer->consumerId;
          taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
          // build msg and persist into trans
        }
L
Liu Jicong 已提交
655 656 657 658 659 660 661 662
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        pRebConsumer->epoch++;
        atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
        SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        mndTransAppendRedolog(pTrans, pConsumerRaw);
        mndReleaseConsumer(pMnode, pRebConsumer);
        // TODO: save history
L
Liu Jicong 已提交
663
      }
L
Liu Jicong 已提交
664
#endif
L
Liu Jicong 已提交
665 666 667

#if 0
    //update consumer status for the subscribption
S
Shengliang Guan 已提交
668
    for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
L
Liu Jicong 已提交
669 670
      SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
      int64_t        consumerId = pCEp->consumerId;
L
Liu Jicong 已提交
671 672 673 674 675
      if (pCEp->status != -1) {
        int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
        if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
          continue;
        }
L
Liu Jicong 已提交
676
        // put consumer into lostConsumer
L
Liu Jicong 已提交
677 678
        SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
        lostConsumer->qmsg = NULL;
L
Liu Jicong 已提交
679
        // put vg into unassigned
L
Liu Jicong 已提交
680 681 682 683 684
        taosArrayPush(pSub->unassignedVg, pCEp);
        // remove from assigned
        // TODO: swap with last one, reduce size and reset i
        taosArrayRemove(pSub->assigned, i);
        // remove from available consumer
S
Shengliang Guan 已提交
685
        for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
L
Liu Jicong 已提交
686
          if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
L
Liu Jicong 已提交
687 688 689 690 691
            taosArrayRemove(pSub->availConsumer, j);
            break;
          }
          // TODO: acquire consumer, set status to unavail
        }
L
Liu Jicong 已提交
692
#if 0
L
Liu Jicong 已提交
693 694
        SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
        pConsumer->epoch++;
L
Liu Jicong 已提交
695
        printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
L
Liu Jicong 已提交
696 697 698 699
        SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
        sdbWriteNotFree(pMnode->pSdb, pRaw);
        mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
700
#endif
L
Liu Jicong 已提交
701 702
      }
    }
L
Liu Jicong 已提交
703 704 705 706 707 708 709 710
    // no available consumer, skip rebalance
    if (taosArrayGetSize(pSub->availConsumer) == 0) {
      continue;
    }
    taosArrayGet(pSub->availConsumer, 0);
    // rebalance condition1 : have unassigned vg
    // assign vg to a consumer, trying to find the least assigned one
    if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
L
Liu Jicong 已提交
711 712 713 714 715 716
      char *topic = NULL;
      char *cgroup = NULL;
      mndSplitSubscribeKey(pSub->key, &topic, &cgroup);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
L
Liu Jicong 已提交
717
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
718
        int64_t        consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
L
Liu Jicong 已提交
719 720
        pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);

L
Liu Jicong 已提交
721
        SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
722
        pCEp->oldConsumerId = pCEp->consumerId;
L
Liu Jicong 已提交
723 724 725
        pCEp->consumerId = consumerId;
        taosArrayPush(pSub->assigned, pCEp);

L
Liu Jicong 已提交
726
        SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
727
        pConsumer->epoch++;
L
Liu Jicong 已提交
728 729 730
        SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
        sdbWrite(pMnode->pSdb, pConsumerRaw);
L
Liu Jicong 已提交
731 732
        mndReleaseConsumer(pMnode, pConsumer);

L
Liu Jicong 已提交
733 734 735
        void* msg;
        int32_t msgLen;
        mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
L
Liu Jicong 已提交
736 737 738

        // persist msg
        STransAction action = {0};
739
        action.epSet = pCEp->epSet;
L
Liu Jicong 已提交
740 741
        action.pCont = msg;
        action.contLen = sizeof(SMsgHead) + msgLen;
L
Liu Jicong 已提交
742 743 744
        action.msgType = TDMT_VND_MQ_SET_CONN;
        mndTransAppendRedoAction(pTrans, &action);

L
Liu Jicong 已提交
745
        // persist data
L
Liu Jicong 已提交
746
        SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
747
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
748 749
        mndTransAppendRedolog(pTrans, pRaw);
      }
L
Liu Jicong 已提交
750

L
Liu Jicong 已提交
751 752 753
      if (mndTransPrepare(pMnode, pTrans) != 0) {
        mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
      }
L
Liu Jicong 已提交
754
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
755
      mndTransDrop(pTrans);
L
Liu Jicong 已提交
756 757
      tfree(topic);
      tfree(cgroup);
L
Liu Jicong 已提交
758
    }
L
Liu Jicong 已提交
759
    // rebalance condition2 : imbalance assignment
L
Liu Jicong 已提交
760 761 762
  }
  return 0;
}
L
Liu Jicong 已提交
763
#endif
L
Liu Jicong 已提交
764

L
Liu Jicong 已提交
765
#if 0
S
Shengliang Guan 已提交
766
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
767 768
  SSdb      *pSdb = pMnode->pSdb;
  SVgObj    *pVgroup = NULL;
L
Liu Jicong 已提交
769
  SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
L
Liu Jicong 已提交
770
  SArray    *pArray = NULL;
L
Liu Jicong 已提交
771 772
  SArray    *inner = taosArrayGet(pDag->pSubplans, 0);
  SSubplan  *plan = taosArrayGetP(inner, 0);
L
Liu Jicong 已提交
773
  SArray    *unassignedVg = pSub->unassignedVg;
L
Liu Jicong 已提交
774

L
Liu Jicong 已提交
775 776 777 778
  void *pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
779 780 781 782
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }
L
Liu Jicong 已提交
783

L
Liu Jicong 已提交
784
    pSub->vgNum++;
L
Liu Jicong 已提交
785 786 787 788 789 790 791 792
    plan->execNode.nodeId = pVgroup->vgId;
    plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);

    if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
      terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
      mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
      return -1;
    }
L
Liu Jicong 已提交
793

L
Liu Jicong 已提交
794 795 796
    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
797
    STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
L
Liu Jicong 已提交
798 799
    consumerEp.epSet = pTaskInfo->addr.epset;
    consumerEp.vgId = pTaskInfo->addr.nodeId;
L
Liu Jicong 已提交
800

L
Liu Jicong 已提交
801 802 803
    ASSERT(consumerEp.vgId == pVgroup->vgId);
    consumerEp.qmsg = strdup(pTaskInfo->msg->msg);
    taosArrayPush(unassignedVg, &consumerEp);
L
Liu Jicong 已提交
804
    // TODO: free taskInfo
L
Liu Jicong 已提交
805
    taosArrayDestroy(pArray);
L
Liu Jicong 已提交
806
  }
807

L
Liu Jicong 已提交
808
  /*qDestroyQueryDag(pDag);*/
809
  return 0;
L
Liu Jicong 已提交
810
}
L
Liu Jicong 已提交
811
#endif
L
Liu Jicong 已提交
812

S
Shengliang Guan 已提交
813 814
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
815
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
816 817 818 819
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
820
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
821 822 823 824 825 826 827 828 829 830 831 832 833 834
      .sql = pTopic->sql,
      .logicalPlan = pTopic->logicalPlan,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
  void   *buf = malloc(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
835

L
Liu Jicong 已提交
836
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
837

L
Liu Jicong 已提交
838 839
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
840

L
Liu Jicong 已提交
841 842
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
843

S
Shengliang Guan 已提交
844 845
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
846 847 848 849 850
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
851

L
Liu Jicong 已提交
852 853 854 855
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    free(buf);
    return -1;
L
Liu Jicong 已提交
856 857 858 859 860 861 862
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
863
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
864
  void   *buf = NULL;
L
Liu Jicong 已提交
865
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
866
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
867 868 869 870

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

L
Liu Jicong 已提交
871
  buf = malloc(tlen);
L
Liu Jicong 已提交
872
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
873

L
Liu Jicong 已提交
874 875
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
876 877 878 879 880 881 882

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
883 884
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
885
SUB_ENCODE_OVER:
L
Liu Jicong 已提交
886
  tfree(buf);
L
Liu Jicong 已提交
887
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
888 889 890 891 892 893 894 895 896 897 898
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
899
  void *buf = NULL;
L
Liu Jicong 已提交
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
918
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
919
  buf = malloc(tlen + 1);
L
Liu Jicong 已提交
920 921 922 923 924 925 926 927
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
928 929
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
930
SUB_DECODE_OVER:
L
Liu Jicong 已提交
931
  tfree(buf);
L
Liu Jicong 已提交
932
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
948
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
949 950 951 952 953 954 955 956
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
957
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
958
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
959
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
960
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
961
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
962
  return 0;
L
Liu Jicong 已提交
963 964
}

L
Liu Jicong 已提交
965
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
966 967 968
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
969 970
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
971
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
972 973 974 975
  }
  return pSub;
}

L
Liu Jicong 已提交
976 977 978 979
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
980
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
981 982 983 984
  }
  return pSub;
}

L
Liu Jicong 已提交
985 986 987 988 989 990 991 992 993 994 995
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pMnode;
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
996
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
997 998

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
999
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
1000 1001 1002 1003

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
1004
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
1005
  bool    createConsumer = false;
L
Liu Jicong 已提交
1006 1007 1008 1009
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
1010 1011
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
1012
  } else {
L
Liu Jicong 已提交
1013
    pConsumer->epoch++;
L
Liu Jicong 已提交
1014
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
1015
  }
L
Liu Jicong 已提交
1016
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
1017 1018 1019 1020 1021

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
1022
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
1023 1024 1025 1026 1027
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
1028
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
1029
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
1030 1031
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
1032 1033
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
1034
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1035 1036
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
1037
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1038 1039
      i++;
    } else {
L
Liu Jicong 已提交
1040
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
1041
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
1042

S
Shengliang Guan 已提交
1043
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
1060 1061 1062 1063 1064
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
1065 1066
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
1067 1068
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
1069 1070
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
1071 1072
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
L
Liu Jicong 已提交
1073
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
1074
          }
L
Liu Jicong 已提交
1075
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
1076
          break;
L
Liu Jicong 已提交
1077 1078
        }
      }
L
Liu Jicong 已提交
1079 1080
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
1081 1082
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
1083 1084 1085 1086 1087
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
1088
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
1089 1090 1091
        continue;
      }

L
Liu Jicong 已提交
1092 1093
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
1094
      if (pSub == NULL) {
L
Liu Jicong 已提交
1095 1096 1097
        mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
1098 1099

        mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
L
Liu Jicong 已提交
1100
      }
L
Liu Jicong 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
1113
        if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
1114 1115
          mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName,
                pConsumerEp->consumerId);
1116 1117 1118 1119
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
        }
L
Liu Jicong 已提交
1120
        // to trigger rebalance at once, do not set status active
1121
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
1122
      }
L
Liu Jicong 已提交
1123

L
Liu Jicong 已提交
1124
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
1125
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
1126
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
1127

L
Liu Jicong 已提交
1128 1129
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
1130 1131 1132
    }
  }

L
Liu Jicong 已提交
1133
  if (oldSub) taosArrayDestroyEx(oldSub, free);
L
Liu Jicong 已提交
1134 1135 1136 1137 1138 1139 1140

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
1141
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
1142
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
1143
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1144 1145 1146 1147
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
1148
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
1149
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
1150 1151
}

L
Liu Jicong 已提交
1152 1153 1154 1155
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
1156 1157 1158 1159 1160

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}