mndSubscribe.c 32.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndScheduler.h"
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31 32
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
33
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
36
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42
enum {
  MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
  MQ_SUBSCRIBE_STATUS__DELETED,
};

L
Liu Jicong 已提交
43
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46 47 48 49 50
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);
static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
L
Liu Jicong 已提交
59

S
Shengliang Guan 已提交
60 61
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp);
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65 66
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
                                      const char *topicName);
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
                                       const char *oldTopicName);
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77
int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
78
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
79
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
80
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
81
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
82
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
83
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
L
Liu Jicong 已提交
84 85 86
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
87
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
L
Liu Jicong 已提交
88 89 90 91 92
  SMqSubscribeObj *pSub = tNewSubscribeObj();
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
93 94
  char key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, pTopic->name);
L
Liu Jicong 已提交
95 96
  strcpy(pSub->key, key);

L
Liu Jicong 已提交
97 98
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSMqSubscribeObj(pSub);
wafwerar's avatar
wafwerar 已提交
99
    taosMemoryFree(pSub);
L
Liu Jicong 已提交
100 101 102
    return NULL;
  }

L
Liu Jicong 已提交
103 104 105 106
  // TODO: disable alter subscribed table
  return pSub;
}

L
Liu Jicong 已提交
107 108
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
                                    const char *topicName) {
L
Liu Jicong 已提交
109
  SMqMVRebReq req = {
L
Liu Jicong 已提交
110 111 112
      .vgId = pConsumerEp->vgId,
      .oldConsumerId = pConsumerEp->oldConsumerId,
      .newConsumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
113
  };
L
Liu Jicong 已提交
114
  req.topic = strdup(topicName);
L
Liu Jicong 已提交
115

L
Liu Jicong 已提交
116
  int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
117
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
128
  tEncodeSMqMVRebReq(&abuf, &req);
L
Liu Jicong 已提交
129
  taosMemoryFree(req.topic);
L
Liu Jicong 已提交
130

L
Liu Jicong 已提交
131 132 133 134 135 136
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
137 138
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
                                      const char *topicName) {
139
  ASSERT(pConsumerEp->oldConsumerId != -1);
L
Liu Jicong 已提交
140 141 142

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
143
  if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) {
L
Liu Jicong 已提交
144 145 146
    return -1;
  }

S
Shengliang Guan 已提交
147 148 149
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
150 151 152 153
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
154
  action.msgType = TDMT_VND_MQ_REB;
L
Liu Jicong 已提交
155 156 157

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
158
    taosMemoryFree(buf);
L
Liu Jicong 已提交
159 160 161 162 163 164
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
165 166
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
                                     const char *oldTopicName) {
L
Liu Jicong 已提交
167
  SMqCancelConnReq req = {0};
L
Liu Jicong 已提交
168
  req.consumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
169 170 171
  req.vgId = pConsumerEp->vgId;
  req.epoch = pConsumerEp->epoch;
  strcpy(req.topicName, oldTopicName);
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173
  int32_t tlen = tEncodeSMqCancelConnReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
174
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
175 176 177 178 179 180 181 182 183
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(pConsumerEp->vgId);
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
184
  tEncodeSMqCancelConnReq(&abuf, &req);
L
Liu Jicong 已提交
185 186 187 188 189
  *pBuf = buf;
  *pLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
190 191
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
                                       const char *oldTopicName) {
L
Liu Jicong 已提交
192 193
  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
194
  if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) {
L
Liu Jicong 已提交
195 196 197
    return -1;
  }

S
Shengliang Guan 已提交
198 199 200
  int32_t vgId = pConsumerEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
201 202 203 204
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
L
Liu Jicong 已提交
205
  action.msgType = TDMT_VND_MQ_CANCEL_CONN;
L
Liu Jicong 已提交
206 207 208

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
209
    taosMemoryFree(buf);
L
Liu Jicong 已提交
210 211 212 213 214 215
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
216 217
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
218
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
219
  SMqCMGetSubEpRsp  rsp = {0};
L
Liu Jicong 已提交
220
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
221
  int32_t           epoch = ntohl(pReq->epoch);
L
Liu Jicong 已提交
222

S
Shengliang Guan 已提交
223
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
L
Liu Jicong 已提交
224
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
225
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
226 227
    return -1;
  }
L
Liu Jicong 已提交
228
  // TODO add lock
L
Liu Jicong 已提交
229
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
L
Liu Jicong 已提交
230
  int32_t serverEpoch = pConsumer->epoch;
L
Liu Jicong 已提交
231

L
Liu Jicong 已提交
232
  // TODO
L
Liu Jicong 已提交
233
  int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
L
Liu Jicong 已提交
234 235
  mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch,
         hbStatus);
L
Liu Jicong 已提交
236 237 238 239 240
  atomic_store_32(&pConsumer->hbStatus, 0);
  /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
  /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
  /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/

L
Liu Jicong 已提交
241
  strcpy(rsp.cgroup, pReq->cgroup);
L
Liu Jicong 已提交
242
  if (epoch != serverEpoch) {
L
Liu Jicong 已提交
243 244
    mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch,
          serverEpoch);
L
Liu Jicong 已提交
245 246 247
    mDebug("consumer %ld try r lock", consumerId);
    taosRLockLatch(&pConsumer->lock);
    mDebug("consumer %ld r locked", consumerId);
L
Liu Jicong 已提交
248
    SArray *pTopics = pConsumer->currentTopics;
S
Shengliang Guan 已提交
249
    int32_t sz = taosArrayGetSize(pTopics);
L
Liu Jicong 已提交
250
    rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
S
Shengliang Guan 已提交
251
    for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
252 253 254
      char            *topicName = taosArrayGetP(pTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
255
      int32_t csz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
256
      // TODO: change to bsearch
S
Shengliang Guan 已提交
257
      for (int32_t j = 0; j < csz; j++) {
L
Liu Jicong 已提交
258 259
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (consumerId == pSubConsumer->consumerId) {
L
temp  
Liu Jicong 已提交
260
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
L
Liu Jicong 已提交
261
          mInfo("topic %s has %d vg", topicName, serverEpoch);
L
Liu Jicong 已提交
262

L
Liu Jicong 已提交
263
          SMqSubTopicEp topicEp;
L
Liu Jicong 已提交
264
          strcpy(topicEp.topic, topicName);
L
Liu Jicong 已提交
265 266 267 268 269 270

          SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topicName);
          ASSERT(pTopic != NULL);
          topicEp.schema = pTopic->schema;
          mndReleaseTopic(pMnode, pTopic);

L
Liu Jicong 已提交
271
          topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
S
Shengliang Guan 已提交
272
          for (int32_t k = 0; k < vgsz; k++) {
L
Liu Jicong 已提交
273
            char           offsetKey[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
274
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
L
Liu Jicong 已提交
275 276 277 278 279
            SMqSubVgEp     vgEp = {
                    .epSet = pConsumerEp->epSet,
                    .vgId = pConsumerEp->vgId,
                    .offset = -1,
            };
L
Liu Jicong 已提交
280 281 282 283 284 285
            mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
            SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
            if (pOffsetObj != NULL) {
              vgEp.offset = pOffsetObj->offset;
              mndReleaseOffset(pMnode, pOffsetObj);
            }
L
Liu Jicong 已提交
286 287 288 289 290
            taosArrayPush(topicEp.vgs, &vgEp);
          }
          taosArrayPush(rsp.topics, &topicEp);
          break;
        }
L
Liu Jicong 已提交
291
      }
L
Liu Jicong 已提交
292
      mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
293
    }
L
Liu Jicong 已提交
294 295
    taosRUnLockLatch(&pConsumer->lock);
    mDebug("consumer %ld r unlock", consumerId);
L
Liu Jicong 已提交
296
  }
L
Liu Jicong 已提交
297
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
298
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
299 300 301 302
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
303
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
L
Liu Jicong 已提交
304
  ((SMqRspHead *)buf)->epoch = serverEpoch;
L
Liu Jicong 已提交
305
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
L
Liu Jicong 已提交
306 307

  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
308
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
309
  tDeleteSMqCMGetSubEpRsp(&rsp);
L
Liu Jicong 已提交
310
  mndReleaseConsumer(pMnode, pConsumer);
S
Shengliang Guan 已提交
311 312
  pMsg->pRsp = buf;
  pMsg->rspLen = tlen;
L
Liu Jicong 已提交
313 314 315
  return 0;
}

L
Liu Jicong 已提交
316
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
S
Shengliang Guan 已提交
317
  int32_t i = 0;
L
Liu Jicong 已提交
318
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
319 320
    i++;
  }
L
Liu Jicong 已提交
321 322 323
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
  strcpy(topic, &key[i + 1]);
L
Liu Jicong 已提交
324 325 326
  return 0;
}

L
Liu Jicong 已提交
327
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
328
  SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
329 330 331
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
332
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
333 334
      return NULL;
    }
L
Liu Jicong 已提交
335
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
L
Liu Jicong 已提交
336 337 338 339
  }
  return pRebSub;
}

S
Shengliang Guan 已提交
340 341
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
342 343 344 345 346 347
  SSdb              *pSdb = pMnode->pSdb;
  SMqConsumerObj    *pConsumer;
  void              *pIter = NULL;
  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);

L
Liu Jicong 已提交
348 349 350
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;
351
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
L
Liu Jicong 已提交
352 353 354 355
    if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
      int32_t old =
          atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
      if (old == MQ_CONSUMER_STATUS__ACTIVE) {
L
Liu Jicong 已提交
356
        // get all topics of that topic
S
Shengliang Guan 已提交
357 358
        int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
359 360 361
          char *topic = taosArrayGetP(pConsumer->currentTopics, i);
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374
          SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
        }
      }
    }
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
      SArray *rebSubs;
      if (status == MQ_CONSUMER_STATUS__INIT) {
        rebSubs = pConsumer->currentTopics;
      } else {
        rebSubs = pConsumer->recentRemovedTopics;
      }
S
Shengliang Guan 已提交
375 376
      int32_t sz = taosArrayGetSize(rebSubs);
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
377 378 379
        char *topic = taosArrayGetP(rebSubs, i);
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
L
Liu Jicong 已提交
380 381 382 383 384 385
        SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
        if (status == MQ_CONSUMER_STATUS__INIT) {
          taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
        } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
        }
L
Liu Jicong 已提交
386
      }
L
Liu Jicong 已提交
387 388 389
      if (status == MQ_CONSUMER_STATUS__MODIFY) {
        int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
        for (int32_t i = 0; i < removeSz; i++) {
L
fix  
Liu Jicong 已提交
390
          char *topicName = taosArrayGetP(pConsumer->recentRemovedTopics, i);
wafwerar's avatar
wafwerar 已提交
391
          taosMemoryFree(topicName);
L
Liu Jicong 已提交
392 393 394
        }
        taosArrayClear(pConsumer->recentRemovedTopics);
      }
L
Liu Jicong 已提交
395 396
    }
  }
L
Liu Jicong 已提交
397 398
  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
L
Liu Jicong 已提交
399 400 401 402 403
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_DO_REBALANCE,
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
S
Shengliang Guan 已提交
404
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
L
Liu Jicong 已提交
405 406 407 408
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
  }
L
Liu Jicong 已提交
409 410 411
  return 0;
}

S
Shengliang Guan 已提交
412 413
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
  SMnode            *pMnode = pMsg->pNode;
S
Shengliang Guan 已提交
414 415
  SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
  STrans            *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
416 417 418 419 420 421 422 423 424
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
    SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
wafwerar's avatar
wafwerar 已提交
425
    taosMemoryFreeClear(pRebSub->key);
L
Liu Jicong 已提交
426

L
Liu Jicong 已提交
427 428
    mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum,
          (int32_t)taosArrayGetSize(pSub->unassignedVg));
L
Liu Jicong 已提交
429 430

    // remove lost consumer
S
Shengliang Guan 已提交
431
    for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
L
Liu Jicong 已提交
432 433
      int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);

434
      mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId);
L
Liu Jicong 已提交
435

S
Shengliang Guan 已提交
436
      for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
L
Liu Jicong 已提交
437 438 439 440
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
        if (pSubConsumer->consumerId == lostConsumerId) {
          taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
          taosArrayPush(pSub->lostConsumers, pSubConsumer);
L
Liu Jicong 已提交
441 442 443 444 445 446 447 448
          taosArrayRemove(pSub->consumers, j);
          break;
        }
      }
    }

    // calculate rebalance
    int32_t consumerNum = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
449 450 451
    if (consumerNum != 0) {
      int32_t vgNum = pSub->vgNum;
      int32_t vgEachConsumer = vgNum / consumerNum;
L
Liu Jicong 已提交
452 453 454
      int32_t imbalanceVg = vgNum % consumerNum;

      // iterate all consumers, set unassignedVgStash
S
Shengliang Guan 已提交
455
      for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
456
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
457 458
        int32_t         vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
        int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
459 460 461 462
        if (i < imbalanceVg)
          vgThisConsumerAfterRb = vgEachConsumer + 1;
        else
          vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
463

464
        mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
L
Liu Jicong 已提交
465
              vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
L
Liu Jicong 已提交
466

L
Liu Jicong 已提交
467
        while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
L
Liu Jicong 已提交
468 469 470
          SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
          ASSERT(pConsumerEp != NULL);
          ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
L
Liu Jicong 已提交
471
          taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
472
          mDebug("mq rebalance: vg %d push to unassignedVg", pConsumerEp->vgId);
L
Liu Jicong 已提交
473 474
        }

L
Liu Jicong 已提交
475
        SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
L
Liu Jicong 已提交
476 477 478
        mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
        taosWLockLatch(&pRebConsumer->lock);
        mDebug("consumer %ld w locked", pRebConsumer->consumerId);
L
Liu Jicong 已提交
479
        int32_t status = atomic_load_32(&pRebConsumer->status);
L
Liu Jicong 已提交
480 481 482
        if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
            (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
            (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
L
fix txn  
Liu Jicong 已提交
483
          /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
L
Liu Jicong 已提交
484
          /*pRebConsumer->epoch++;*/
L
fix txn  
Liu Jicong 已提交
485
          /*}*/
L
Liu Jicong 已提交
486
          if (vgThisConsumerAfterRb != 0) {
L
fix txn  
Liu Jicong 已提交
487
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
L
Liu Jicong 已提交
488
          } else {
L
fix txn  
Liu Jicong 已提交
489
            atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
L
Liu Jicong 已提交
490
          }
L
Liu Jicong 已提交
491

L
fix txn  
Liu Jicong 已提交
492 493
          mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status,
                pRebConsumer->status);
L
Liu Jicong 已提交
494

L
fix txn  
Liu Jicong 已提交
495
          SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
L
Liu Jicong 已提交
496
          sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
L
fix txn  
Liu Jicong 已提交
497
          mndTransAppendCommitlog(pTrans, pConsumerRaw);
L
Liu Jicong 已提交
498
        }
L
Liu Jicong 已提交
499 500
        taosWUnLockLatch(&pRebConsumer->lock);
        mDebug("consumer %ld w unlock", pRebConsumer->consumerId);
L
Liu Jicong 已提交
501
        mndReleaseConsumer(pMnode, pRebConsumer);
L
Liu Jicong 已提交
502 503
      }

L
Liu Jicong 已提交
504
      // assign to vgroup
L
Liu Jicong 已提交
505
      if (taosArrayGetSize(pSub->unassignedVg) != 0) {
S
Shengliang Guan 已提交
506
        for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
507
          SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
S
Shengliang Guan 已提交
508
          int32_t         vgThisConsumerAfterRb;
L
Liu Jicong 已提交
509 510 511 512
          if (i < imbalanceVg)
            vgThisConsumerAfterRb = vgEachConsumer + 1;
          else
            vgThisConsumerAfterRb = vgEachConsumer;
L
Liu Jicong 已提交
513

L
Liu Jicong 已提交
514 515
          while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
            SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
L
Liu Jicong 已提交
516
            mDebug("mq rebalance: vg %d pop from unassignedVg", pConsumerEp->vgId);
L
Liu Jicong 已提交
517 518 519 520
            ASSERT(pConsumerEp != NULL);

            pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
            pConsumerEp->consumerId = pSubConsumer->consumerId;
L
Liu Jicong 已提交
521
            // TODO
L
Liu Jicong 已提交
522
            pConsumerEp->epoch = 0;
L
Liu Jicong 已提交
523
            taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
L
Liu Jicong 已提交
524

L
Liu Jicong 已提交
525 526 527
            char topic[TSDB_TOPIC_FNAME_LEN];
            char cgroup[TSDB_CGROUP_LEN];
            mndSplitSubscribeKey(pSub->key, topic, cgroup);
528
            if (pConsumerEp->oldConsumerId == -1) {
L
Liu Jicong 已提交
529
              SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
530

L
Liu Jicong 已提交
531 532
              mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
                    topic, pConsumerEp->consumerId, cgroup);
533 534 535 536

              mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
              mndReleaseTopic(pMnode, pTopic);
            } else {
L
Liu Jicong 已提交
537 538
              mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
                    pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
539

L
Liu Jicong 已提交
540
              mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic);
541
            }
L
Liu Jicong 已提交
542 543 544
          }
        }
      }
L
Liu Jicong 已提交
545
      ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
L
Liu Jicong 已提交
546 547 548

      // TODO: log rebalance statistics
      SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
L
add log  
Liu Jicong 已提交
549
      sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
550 551 552 553 554 555
      mndTransAppendRedolog(pTrans, pSubRaw);
    }
    mndReleaseSubscribe(pMnode, pSub);
  }
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
556
    taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
557 558 559 560
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
561
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
562 563 564 565
  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
566 567
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
                                      const SMqConsumerEp *pConsumerEp) {
568
  ASSERT(pConsumerEp->oldConsumerId == -1);
L
Liu Jicong 已提交
569 570 571 572
  int32_t vgId = pConsumerEp->vgId;

  SMqSetCVgReq req = {
      .vgId = vgId,
L
Liu Jicong 已提交
573
      .consumerId = pConsumerEp->consumerId,
L
Liu Jicong 已提交
574 575 576 577 578 579 580 581
      .sql = pTopic->sql,
      .physicalPlan = pTopic->physicalPlan,
      .qmsg = pConsumerEp->qmsg,
  };

  strcpy(req.cgroup, cgroup);
  strcpy(req.topicName, pTopic->name);
  int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
582
  void   *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
583 584 585 586
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
  SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
589

L
Liu Jicong 已提交
590 591
  pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
  pMsgHead->vgId = htonl(vgId);
L
Liu Jicong 已提交
592

L
Liu Jicong 已提交
593 594
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
595

S
Shengliang Guan 已提交
596 597
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

L
Liu Jicong 已提交
598 599 600 601 602
  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = sizeof(SMsgHead) + tlen;
  action.msgType = TDMT_VND_MQ_SET_CONN;
L
Liu Jicong 已提交
603

L
Liu Jicong 已提交
604 605
  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
606
    taosMemoryFree(buf);
L
Liu Jicong 已提交
607
    return -1;
L
Liu Jicong 已提交
608 609 610 611 612 613 614
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
615
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
616
  void   *buf = NULL;
L
Liu Jicong 已提交
617
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
618
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
619 620 621 622

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
623
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
624
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
625

L
Liu Jicong 已提交
626 627
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
628 629 630 631 632 633 634

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
635 636
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
637
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
638
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
639
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
640 641 642 643 644 645 646 647 648 649 650
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
651
  void *buf = NULL;
L
Liu Jicong 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
670
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
671
  buf = taosMemoryMalloc(tlen + 1);
L
Liu Jicong 已提交
672 673 674 675 676 677 678 679
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
680 681
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
682
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
683
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
684
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
685
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
686
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
687 688 689 690 691 692 693 694 695 696 697 698 699
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
L
Liu Jicong 已提交
700
  tDeleteSMqSubscribeObj(pSub);
L
Liu Jicong 已提交
701 702 703 704 705 706 707 708
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

L
Liu Jicong 已提交
709
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
710
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
711
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
712
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
713
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
714
  return 0;
L
Liu Jicong 已提交
715 716
}

L
Liu Jicong 已提交
717
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
718 719 720
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
721 722
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
723
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
724 725 726 727
  }
  return pSub;
}

L
Liu Jicong 已提交
728 729 730 731
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
732
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
733 734 735 736
  }
  return pSub;
}

L
Liu Jicong 已提交
737 738 739 740 741
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

S
Shengliang Guan 已提交
742 743
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pNode;
L
Liu Jicong 已提交
744 745 746 747
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
L
Liu Jicong 已提交
748
  char   *cgroup = subscribe.consumerGroup;
L
Liu Jicong 已提交
749 750

  SArray *newSub = subscribe.topicNames;
S
Shengliang Guan 已提交
751
  int32_t newTopicNum = subscribe.topicNum;
L
Liu Jicong 已提交
752 753 754 755

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
S
Shengliang Guan 已提交
756
  int32_t oldTopicNum = 0;
L
Liu Jicong 已提交
757
  bool    createConsumer = false;
L
Liu Jicong 已提交
758 759 760 761
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
L
Liu Jicong 已提交
762 763
    pConsumer = mndCreateConsumer(consumerId, cgroup);
    createConsumer = true;
L
Liu Jicong 已提交
764
  } else {
L
Liu Jicong 已提交
765
    pConsumer->epoch++;
L
Liu Jicong 已提交
766
    oldSub = pConsumer->currentTopics;
L
Liu Jicong 已提交
767
  }
L
Liu Jicong 已提交
768
  pConsumer->currentTopics = newSub;
L
Liu Jicong 已提交
769 770 771 772 773

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

S
Shengliang Guan 已提交
774
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
L
Liu Jicong 已提交
775 776 777 778 779
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

S
Shengliang Guan 已提交
780
  int32_t i = 0, j = 0;
L
Liu Jicong 已提交
781
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
782 783
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
784 785
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
786
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
787 788
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
789
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
790 791
      i++;
    } else {
L
Liu Jicong 已提交
792
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
793
      oldTopicName = taosArrayGetP(oldSub, j);
L
Liu Jicong 已提交
794

S
Shengliang Guan 已提交
795
      int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
L
Liu Jicong 已提交
812 813 814 815 816
      ASSERT(newTopicName == NULL);

      // cancel subscribe of old topic
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
      ASSERT(pSub);
S
Shengliang Guan 已提交
817 818
      int32_t csz = taosArrayGetSize(pSub->consumers);
      for (int32_t ci = 0; ci < csz; ci++) {
L
Liu Jicong 已提交
819 820
        SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
        if (pSubConsumer->consumerId == consumerId) {
S
Shengliang Guan 已提交
821 822
          int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
          for (int32_t vgi = 0; vgi < vgsz; vgi++) {
L
Liu Jicong 已提交
823
            SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
L
Liu Jicong 已提交
824
            mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName);
L
Liu Jicong 已提交
825
            taosArrayPush(pSub->unassignedVg, pConsumerEp);
L
Liu Jicong 已提交
826
          }
L
Liu Jicong 已提交
827
          taosArrayRemove(pSub->consumers, ci);
L
Liu Jicong 已提交
828
          break;
L
Liu Jicong 已提交
829 830
        }
      }
L
Liu Jicong 已提交
831 832
      char *oldTopicNameDup = strdup(oldTopicName);
      taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
L
Liu Jicong 已提交
833 834
      atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
      /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
L
Liu Jicong 已提交
835 836 837 838 839
    } else if (newTopicName != NULL) {
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
840
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
841 842 843
        continue;
      }

L
Liu Jicong 已提交
844 845
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
      bool             createSub = false;
L
Liu Jicong 已提交
846
      if (pSub == NULL) {
L
Liu Jicong 已提交
847 848
        mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
               newTopicName);
L
Liu Jicong 已提交
849 850
        pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
        createSub = true;
L
Liu Jicong 已提交
851 852

        mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
L
Liu Jicong 已提交
853
      }
L
Liu Jicong 已提交
854 855 856 857 858 859 860 861 862 863 864 865

      SMqSubConsumer mqSubConsumer;
      mqSubConsumer.consumerId = consumerId;
      mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
      taosArrayPush(pSub->consumers, &mqSubConsumer);

      // if have un assigned vg, assign one to the consumer
      if (taosArrayGetSize(pSub->unassignedVg) > 0) {
        SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
        pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
        pConsumerEp->consumerId = consumerId;
        taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
866
        if (pConsumerEp->oldConsumerId == -1) {
867
          mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
L
Liu Jicong 已提交
868
                pConsumerEp->consumerId);
869 870
          mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
        } else {
L
Liu Jicong 已提交
871
          mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName);
872
        }
L
Liu Jicong 已提交
873
        // to trigger rebalance at once, do not set status active
874
        /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
L
Liu Jicong 已提交
875
      }
L
Liu Jicong 已提交
876

L
Liu Jicong 已提交
877
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
878
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
879
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
880

L
Liu Jicong 已提交
881 882
      if (!createSub) mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
883 884 885
    }
  }

L
Liu Jicong 已提交
886
  /*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
L
Liu Jicong 已提交
887 888 889 890 891 892 893

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
894
    mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
895
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
896
    if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
897 898 899 900
    return -1;
  }

  mndTransDrop(pTrans);
L
Liu Jicong 已提交
901
  if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
902
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
903 904
}

S
Shengliang Guan 已提交
905
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
L
Liu Jicong 已提交
906 907 908
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交
909 910 911 912 913

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}