mndSubscribe.c 30.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndScheduler.h"
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31 32
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
33
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
36
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

L
Liu Jicong 已提交
43
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46 47 48 49 50
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);
static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
L
Liu Jicong 已提交
59

S
Shengliang Guan 已提交
60 61
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);

L
Liu Jicong 已提交
65 66 67 68 69 70 71 72 73 74
int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
75
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
76
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
77
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
80 81 82
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
83
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
L
Liu Jicong 已提交
84 85 86 87 88
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
89 90
  char key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, pTopic->name);
L
Liu Jicong 已提交
91 92
  strcpy(pSub->key, key);

L
Liu Jicong 已提交
93 94
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSMqSubscribeObj(pSub);
wafwerar's avatar
wafwerar 已提交
95
    taosMemoryFree(pSub);
L
Liu Jicong 已提交
96 97 98
    return NULL;
  }

L
Liu Jicong 已提交
99 100 101 102
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
103
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
L
Liu Jicong 已提交
104
  SMqMVRebReq req = {
L
Liu Jicong 已提交
105 106 107
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
108 109
  };

L
Liu Jicong 已提交
110
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
111
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
112 113 114 115 116 117 118 119
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
120

L
Liu Jicong 已提交
121
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
122
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
123

L
Liu Jicong 已提交
124 125 126 127 128 129
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
130
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
131
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
132 133 134

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
135
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
L
Liu Jicong 已提交
136 137 138
    return -1;
  }

S
Shengliang Guan 已提交
139 140 141
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
142 143 144 145
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
146
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
147 148 149

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
150
    taosMemoryFree(buf);
L
Liu Jicong 已提交
151 152 153 154 155 156 157 158
    return -1;
  }

  return 0;
}

static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
159
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
160 161

  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
162
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
  void   *buf;
  int32_t tlen;
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
    return -1;
  }

S
Shengliang Guan 已提交
185 186 187
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
188 189 190 191 192 193 194 195
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
196
    taosMemoryFree(buf);
L
Liu Jicong 已提交
197 198 199 200 201 202
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
203 204
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
205
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
206
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
207
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
208
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
209

S
Shengliang Guan 已提交
210
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
L
Liu Jicong 已提交
211
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
212
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
213 214 215 216
    return -1;
  }
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);

L
Liu Jicong 已提交
217
  // TODO
L
Liu Jicong 已提交
218
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
219
  mTrace("try to get sub ep, old val: %d", hbStatus);
L
Liu Jicong 已提交
220 221 222 223 224
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
225
  strcpy(rsp.cgroup, pReq->cgroup);
L
Liu Jicong 已提交
226 227
  if (epoch != pConsumer->epoch) {
    mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
L
Liu Jicong 已提交
228
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
229
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
230
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
231
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
232 233 234
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
235
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
236
      // TODO: change to bsearch
S
Shengliang Guan 已提交
237
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
238 239
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
L
temp  
Liu Jicong 已提交
240 241
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          mInfo("topic %s has %d vg", topicName, pConsumer->epoch);
L
Liu Jicong 已提交
242
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
243
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
244
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
245
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
246
            char           offsetKey[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
247
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
L
Liu Jicong 已提交
248 249 250 251 252
            SMqSubVgEp     vgEp = {
                    .epSet = pConsumerEp->epSet,
                    .vgId = pConsumerEp->vgId,
                    .offset = -1,
            };
L
Liu Jicong 已提交
253 254 255 256 257 258
            mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
            SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
            if (pOffsetObj != NULL) {
              vgEp.offset = pOffsetObj->offset;
              mndReleaseOffset(pMnode, pOffsetObj);
            }
L
Liu Jicong 已提交
259 260 261 262 263
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
264
      }
L
Liu Jicong 已提交
265
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
266 267
    }
  }
L
Liu Jicong 已提交
268
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
269
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
270 271 272 273
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
274 275
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = pConsumer->epoch;
L
Liu Jicong 已提交
276
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
L
Liu Jicong 已提交
277 278

  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
279
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
280
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
281
  mndReleaseConsumer(pMnode, pConsumer);
S
Shengliang Guan 已提交
282 283
  pMsg->pRsp = buf;
  pMsg->rspLen = tlen;
L
Liu Jicong 已提交
284 285 286
  return 0;
}

L
Liu Jicong 已提交
287
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
S
Shengliang Guan 已提交
288
  int32_t i = 0;
L
Liu Jicong 已提交
289
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
290 291
    i++;
  }
L
Liu Jicong 已提交
292 293 294
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
  strcpy(topic, &key[i + 1]);
L
Liu Jicong 已提交
295 296 297
  return 0;
}

L
Liu Jicong 已提交
298
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
299
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
300 301 302
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
303
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
304 305
      return NULL;
    }
L
Liu Jicong 已提交
306
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
L
Liu Jicong 已提交
307 308 309 310
  }
  return pRebSub;
}

S
Shengliang Guan 已提交
311 312
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
313 314 315 316 317 318
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
319 320 321
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
322
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
323 324 325 326
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
327
        // get all topics of that topic
S
Shengliang Guan 已提交
328 329
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
330 331 332
          char *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
346 347
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
348 349 350
        char *topic = taosArrayGetP(rebSubs, i);
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
351 352 353 354 355 356
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
357
      }
L
Liu Jicong 已提交
358 359 360 361
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
          char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
wafwerar's avatar
wafwerar 已提交
362
          taosMemoryFree(topicName);
L
Liu Jicong 已提交
363 364 365
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
366 367
    }
  }
L
Liu Jicong 已提交
368 369
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
L
Liu Jicong 已提交
370 371 372 373 374
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_DO_REBALANCE,
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
S
Shengliang Guan 已提交
375
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
L
Liu Jicong 已提交
376 377 378 379
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
380 381 382
  return 0;
}

S
Shengliang Guan 已提交
383 384
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
S
Shengliang Guan 已提交
385 386
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
387 388 389 390 391 392 393 394 395
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
wafwerar's avatar
wafwerar 已提交
396
    taosMemoryFreeClear(pRebSub->key);
L
Liu Jicong 已提交
397 398 399 400

    mInfo("mq rebalance subscription: %s", pSub->key);

    // remove lost consumer
S
Shengliang Guan 已提交
401
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
402 403
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

404
      mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId);
L
Liu Jicong 已提交
405

S
Shengliang Guan 已提交
406
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
407 408 409 410
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
411 412 413 414 415 416 417 418
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
419 420 421
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
422 423 424
      int32_t imbalanceVg = vgNum % consumerNum;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
425
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
426
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
427 428
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
429 430 431 432
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
433

434
        mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
L
Liu Jicong 已提交
435
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
436

L
Liu Jicong 已提交
437
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
438 439 440
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
441
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
442 443
        }

L
Liu Jicong 已提交
444 445 446 447 448
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
        int32_t         status = atomic_load_32(&pRebConsumer->status);
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
temp  
Liu Jicong 已提交
449 450 451 452 453
          SMqConsumerObj* pNewRebConsumer = taosMemoryMalloc(sizeof(SMqConsumerObj));
          ASSERT(pNewRebConsumer);
          memcpy(pNewRebConsumer, pRebConsumer, sizeof(SMqConsumerObj));
          pNewRebConsumer->currentTopics = taosArrayDup(pRebConsumer->currentTopics);
          pNewRebConsumer->recentRemovedTopics = taosArrayDup(pRebConsumer->recentRemovedTopics);
L
Liu Jicong 已提交
454
          if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
L
temp  
Liu Jicong 已提交
455
            pNewRebConsumer->epoch++;
L
Liu Jicong 已提交
456
          }
L
Liu Jicong 已提交
457
          if (vgThisConsumerAfterRb != 0) {
L
temp  
Liu Jicong 已提交
458
            atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
L
Liu Jicong 已提交
459
          } else {
L
temp  
Liu Jicong 已提交
460
            atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
L
Liu Jicong 已提交
461
          }
L
Liu Jicong 已提交
462

L
temp  
Liu Jicong 已提交
463 464
          mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pNewRebConsumer->consumerId, status,
                pNewRebConsumer->status);
L
Liu Jicong 已提交
465

L
temp  
Liu Jicong 已提交
466
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pNewRebConsumer);
L
Liu Jicong 已提交
467 468 469 470
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
          mndTransAppendRedolog(pTrans, pConsumerRaw);
        }
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
471 472
      }

L
Liu Jicong 已提交
473
      // assign to vgroup
L
Liu Jicong 已提交
474
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
475
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
476
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
477
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
478 479 480 481
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
482

L
Liu Jicong 已提交
483 484
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
485 486 487 488
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
489
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
490

491
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
492 493 494
              char topic[TSDB_TOPIC_FNAME_LEN];
              char cgroup[TSDB_CGROUP_LEN];
              mndSplitSubscribeKey(pSub->key, topic, cgroup);
L
Liu Jicong 已提交
495
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
496

L
Liu Jicong 已提交
497 498
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
                    topic, pConsumerEp->consumerId, cgroup);
499 500 501 502

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
            } else {
L
Liu Jicong 已提交
503 504
              mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
                    pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
505

506 507
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
            }
L
Liu Jicong 已提交
508 509 510
          }
        }
      }
L
Liu Jicong 已提交
511
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
512 513 514 515 516 517 518 519 520 521

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
522
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
523 524 525 526
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
527
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
528 529 530 531
  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
532 533
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
534
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
535 536 537 538
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
539
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
540 541 542 543 544 545 546 547 548
      .sql = pTopic->sql,
      .logicalPlan = pTopic->logicalPlan,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
549
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
550 551 552 553
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
554

L
Liu Jicong 已提交
555
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
556

L
Liu Jicong 已提交
557 558
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
559

L
Liu Jicong 已提交
560 561
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
562

S
Shengliang Guan 已提交
563 564
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
565 566 567 568 569
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
570

L
Liu Jicong 已提交
571 572
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
573
    taosMemoryFree(buf);
L
Liu Jicong 已提交
574
    return -1;
L
Liu Jicong 已提交
575 576 577 578 579 580 581
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
582
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
583
  void   *buf = NULL;
L
Liu Jicong 已提交
584
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
585
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
586 587 588 589

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
590
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
591
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
592

L
Liu Jicong 已提交
593 594
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
595 596 597 598 599 600 601

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
602 603
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
604
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
605
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
606
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
607 608 609 610 611 612 613 614 615 616 617
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
618
  void *buf = NULL;
L
Liu Jicong 已提交
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
637
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
638
  buf = taosMemoryMalloc(tlen + 1);
L
Liu Jicong 已提交
639 640 641 642 643 644 645 646
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
647 648
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
649
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
650
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
651
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
652
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
653
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
654 655 656 657 658 659 660 661 662 663 664 665 666
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
667
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
668 669 670 671 672 673 674 675
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
676
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
677
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
678
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
679
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
680
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
681
  return 0;
L
Liu Jicong 已提交
682 683
}

L
Liu Jicong 已提交
684
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
685 686 687
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
688 689
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
690
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
691 692 693 694
  }
  return pSub;
}

L
Liu Jicong 已提交
695 696 697 698
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
699
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
700 701 702 703
  }
  return pSub;
}

L
Liu Jicong 已提交
704 705 706 707 708
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

S
Shengliang Guan 已提交
709 710
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
711 712 713 714
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
715
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
716 717

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
718
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
719 720 721 722

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
723
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
724
  bool    createConsumer = false;
L
Liu Jicong 已提交
725 726 727 728
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
729 730
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
731
  } else {
L
Liu Jicong 已提交
732
    pConsumer->epoch++;
L
Liu Jicong 已提交
733
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
734
  }
L
Liu Jicong 已提交
735
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
736 737 738 739 740

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
741
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
742 743 744 745 746
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
747
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
748
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
749 750
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
751 752
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
753
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
754 755
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
756
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
757 758
      i++;
    } else {
L
Liu Jicong 已提交
759
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
760
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
761

S
Shengliang Guan 已提交
762
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
779 780 781 782 783
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
784 785
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
786 787
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
788 789
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
790 791
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
L
Liu Jicong 已提交
792
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
793
          }
L
Liu Jicong 已提交
794
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
795
          break;
L
Liu Jicong 已提交
796 797
        }
      }
L
Liu Jicong 已提交
798 799
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
800 801
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
802 803 804 805 806
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
807
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
808 809 810
        continue;
      }

L
Liu Jicong 已提交
811 812
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
813
      if (pSub == NULL) {
L
Liu Jicong 已提交
814 815
        mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
               newTopicName);
L
Liu Jicong 已提交
816 817
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
818 819

        mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
L
Liu Jicong 已提交
820
      }
L
Liu Jicong 已提交
821 822 823 824 825 826 827 828 829 830 831 832

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
833
        if (pConsumerEp->oldConsumerId == -1) {
834
          mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
L
Liu Jicong 已提交
835
                pConsumerEp->consumerId);
836 837 838 839
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
        }
L
Liu Jicong 已提交
840
        // to trigger rebalance at once, do not set status active
841
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
842
      }
L
Liu Jicong 已提交
843

L
Liu Jicong 已提交
844
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
845
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
846
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
847

L
Liu Jicong 已提交
848 849
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
850 851 852
    }
  }

L
Liu Jicong 已提交
853
  if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);
L
Liu Jicong 已提交
854 855 856 857 858 859 860

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
861
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
862
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
863
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
864 865 866 867
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
868
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
869
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
870 871
}

S
Shengliang Guan 已提交
872
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
L
Liu Jicong 已提交
873 874 875
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
876 877 878 879 880

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}