mndSubscribe.c 28.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndSubscribe.h"
L
Liu Jicong 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
33 34
#define MND_SUBSCRIBE_REBALANCE_MS 5000

L
Liu Jicong 已提交
35 36
static char *mndMakeSubscribeKey(char *cgroup, char *topicName);

L
Liu Jicong 已提交
37 38 39 40 41 42 43 44 45 46
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
47
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
L
Liu Jicong 已提交
49 50

static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
L
fix tq  
Liu Jicong 已提交
51
                                      SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59 60 61 62

int32_t mndInitSubscribe(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndSubActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSubActionDecode,
                     .insertFp = (SdbInsertFp)mndSubActionInsert,
                     .updateFp = (SdbUpdateFp)mndSubActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSubActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
63
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
L
Liu Jicong 已提交
64
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
L
Liu Jicong 已提交
66 67 68
  return sdbSetTable(pMnode->pSdb, table);
}

L
Liu Jicong 已提交
69 70
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
  SMnode           *pMnode = pMsg->pMnode;
L
Liu Jicong 已提交
71
  SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
72 73
  SMqCMGetSubEpRsp  rsp;
  int64_t           consumerId = be64toh(pReq->consumerId);
L
Liu Jicong 已提交
74
  int64_t           currentTs = taosGetTimestampMs();
L
Liu Jicong 已提交
75 76 77

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
78
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
79 80 81 82 83 84
    return -1;
  }
  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);

  strcpy(rsp.cgroup, pReq->cgroup);
  rsp.consumerId = consumerId;
L
Liu Jicong 已提交
85
  rsp.epoch = pConsumer->epoch;
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94
  SArray *pTopics = pConsumer->topics;
  int32_t sz = taosArrayGetSize(pTopics);
  rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
  for (int32_t i = 0; i < sz; i++) {
    SMqSubTopicEp     topicEp;
    SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i);
    strcpy(topicEp.topic, pConsumerTopic->name);

    SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
L
Liu Jicong 已提交
95 96 97 98
    ASSERT(pSub);
    bool found = 0;
    bool changed = 0;
    for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
L
Liu Jicong 已提交
99
      if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) {
L
Liu Jicong 已提交
100 101 102 103 104 105 106 107
        found = 1;
        break;
      }
    }
    if (found == 0) {
      taosArrayPush(pSub->availConsumer, &consumerId);
    }

L
Liu Jicong 已提交
108
    int32_t assignedSz = taosArrayGetSize(pSub->assigned);
L
Liu Jicong 已提交
109 110
    topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
    for (int32_t j = 0; j < assignedSz; j++) {
L
Liu Jicong 已提交
111
      SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
L
Liu Jicong 已提交
112
      if (pCEp->consumerId == consumerId) {
L
Liu Jicong 已提交
113
        pCEp->lastConsumerHbTs = currentTs;
L
Liu Jicong 已提交
114
        SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId};
L
Liu Jicong 已提交
115
        taosArrayPush(topicEp.vgs, &vgEp);
L
Liu Jicong 已提交
116
        changed = 1;
L
Liu Jicong 已提交
117 118 119 120 121
      }
    }
    if (taosArrayGetSize(topicEp.vgs) != 0) {
      taosArrayPush(rsp.topics, &topicEp);
    }
L
Liu Jicong 已提交
122
    if (changed || found) {
L
Liu Jicong 已提交
123
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
124
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
125
      sdbWrite(pMnode->pSdb, pRaw);
L
Liu Jicong 已提交
126 127
    }
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
128 129
  }
  int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
L
Liu Jicong 已提交
130
  void   *buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
131 132 133 134 135 136
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  void *abuf = buf;
  tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
L
Liu Jicong 已提交
137
  // TODO: free rsp
L
Liu Jicong 已提交
138 139 140 141 142
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  return 0;
}

L
Liu Jicong 已提交
143 144 145 146 147 148
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
  int i = 0;
  while (key[i] != ':') {
    i++;
  }
  key[i] = 0;
L
Liu Jicong 已提交
149
  *cgroup = strdup(key);
L
Liu Jicong 已提交
150
  key[i] = ':';
L
Liu Jicong 已提交
151
  *topic = strdup(&key[i + 1]);
L
Liu Jicong 已提交
152 153 154 155 156 157 158 159
  return 0;
}

static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
  SMnode          *pMnode = pMsg->pMnode;
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = NULL;
  void            *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
L
Liu Jicong 已提交
160 161
  int64_t          currentTs = taosGetTimestampMs();
  int32_t          sz;
L
Liu Jicong 已提交
162
  while (pIter != NULL) {
L
Liu Jicong 已提交
163
    for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
L
Liu Jicong 已提交
164 165 166
      SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
      int64_t        consumerId = pCEp->consumerId;
      if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
L
Liu Jicong 已提交
167 168 169 170 171 172 173 174 175
        // put consumer into lostConsumer
        taosArrayPush(pSub->lostConsumer, pCEp);
        // put vg into unassgined
        taosArrayPush(pSub->unassignedVg, pCEp);
        // remove from assigned
        // TODO: swap with last one, reduce size and reset i
        taosArrayRemove(pSub->assigned, i);
        // remove from available consumer
        for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
L
Liu Jicong 已提交
176
          if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
L
Liu Jicong 已提交
177 178 179 180 181
            taosArrayRemove(pSub->availConsumer, j);
            break;
          }
          // TODO: acquire consumer, set status to unavail
        }
L
Liu Jicong 已提交
182
#if 0
L
Liu Jicong 已提交
183 184
        SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
        pConsumer->epoch++;
L
Liu Jicong 已提交
185
        printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
L
Liu Jicong 已提交
186 187 188 189
        SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
        sdbWriteNotFree(pMnode->pSdb, pRaw);
        mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
190
#endif
L
Liu Jicong 已提交
191 192 193
      }
    }
    if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) {
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201
      char *topic = NULL;
      char *cgroup = NULL;
      mndSplitSubscribeKey(pSub->key, &topic, &cgroup);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);

      // create trans
      STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
L
Liu Jicong 已提交
202
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
203 204 205 206
        int64_t        consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
        SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
        pCEp->consumerId = consumerId;
        taosArrayPush(pSub->assigned, pCEp);
L
Liu Jicong 已提交
207
        pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
L
Liu Jicong 已提交
208

L
Liu Jicong 已提交
209
        SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
210 211 212 213 214 215
        pConsumer->epoch++;
        /*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
        /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
        /*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/
        mndReleaseConsumer(pMnode, pConsumer);

L
Liu Jicong 已提交
216
        // build msg
L
Liu Jicong 已提交
217

L
Liu Jicong 已提交
218 219 220 221 222 223 224
        SMqSetCVgReq req = {0};
        strcpy(req.cgroup, cgroup);
        strcpy(req.topicName, topic);
        req.sql = pTopic->sql;
        req.logicalPlan = pTopic->logicalPlan;
        req.physicalPlan = pTopic->physicalPlan;
        req.qmsg = pCEp->qmsg;
L
Liu Jicong 已提交
225
        req.newConsumerId = consumerId;
L
Liu Jicong 已提交
226
        int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
L
Liu Jicong 已提交
227
        void   *buf = malloc(sizeof(SMsgHead) + tlen);
L
Liu Jicong 已提交
228
        if (buf == NULL) {
L
Liu Jicong 已提交
229 230 231
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
L
Liu Jicong 已提交
232 233 234 235 236 237
        SMsgHead *pMsgHead = (SMsgHead *)buf;

        pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
        pMsgHead->vgId = htonl(pCEp->vgId);
        void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
        tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
238 239 240

        // persist msg
        STransAction action = {0};
241
        action.epSet = pCEp->epSet;
L
Liu Jicong 已提交
242
        action.pCont = buf;
L
Liu Jicong 已提交
243
        action.contLen = sizeof(SMsgHead) + tlen;
L
Liu Jicong 已提交
244 245 246 247 248
        action.msgType = TDMT_VND_MQ_SET_CONN;
        mndTransAppendRedoAction(pTrans, &action);

        // persist raw
        SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
249
        sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
250 251 252 253 254 255 256 257
        mndTransAppendRedolog(pTrans, pRaw);

        tfree(topic);
        tfree(cgroup);
      }
      if (mndTransPrepare(pMnode, pTrans) != 0) {
        mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
      }
L
Liu Jicong 已提交
258
      mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
259 260
      mndTransDrop(pTrans);
    }
L
Liu Jicong 已提交
261
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
L
Liu Jicong 已提交
262 263 264 265
  }
  return 0;
}

L
Liu Jicong 已提交
266
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
L
Liu Jicong 已提交
267
  // convert phyplan to dag
L
Liu Jicong 已提交
268
  SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
L
Liu Jicong 已提交
269
  SArray    *pArray = NULL;
L
Liu Jicong 已提交
270 271
  SArray    *inner = taosArrayGet(pDag->pSubplans, 0);
  SSubplan  *plan = taosArrayGetP(inner, 0);
L
Liu Jicong 已提交
272 273
  SSdb      *pSdb = pMnode->pSdb;
  SVgObj    *pVgroup = NULL;
L
Liu Jicong 已提交
274

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
  void *pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) continue;

    plan->execNode.nodeId = pVgroup->vgId;
    plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);

    if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
      terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
      mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
      return -1;
    }
    if (pArray && taosArrayGetSize(pArray) != 1) {
      terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
      mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
      return -1;
    }
L
Liu Jicong 已提交
294

295
    SMqConsumerEp CEp;
L
Liu Jicong 已提交
296
    CEp.status = 0;
L
Liu Jicong 已提交
297
    CEp.consumerId = -1;
298
    CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
L
Liu Jicong 已提交
299
    STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
H
Haojun Liao 已提交
300
    CEp.epSet = pTaskInfo->addr.epset;
301
    CEp.vgId = pTaskInfo->addr.nodeId;
L
Liu Jicong 已提交
302 303

    ASSERT(CEp.vgId == pVgroup->vgId);
L
Liu Jicong 已提交
304
    CEp.qmsg = strdup(pTaskInfo->msg->msg);
305
    taosArrayPush(unassignedVg, &CEp);
L
Liu Jicong 已提交
306 307 308 309 310 311 312
    //TODO: free taskInfo
    taosArrayDestroy(pArray);

    /*SEpSet *pEpSet = &plan->execNode.epset;*/
    /*pEpSet->inUse = 0;*/
    /*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
     
L
Liu Jicong 已提交
313
  }
314

L
Liu Jicong 已提交
315
  /*qDestroyQueryDag(pDag);*/
316
  return 0;
L
Liu Jicong 已提交
317 318 319
}

static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
L
Liu Jicong 已提交
320
                                      SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) {
L
Liu Jicong 已提交
321 322 323 324 325 326
  int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
  for (int32_t i = 0; i < sz; i++) {
    int32_t      vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
    SVgObj      *pVgObj = mndAcquireVgroup(pMnode, vgId);
    SMqSetCVgReq req = {
        .vgId = vgId,
L
Liu Jicong 已提交
327 328
        .oldConsumerId = -1,
        .newConsumerId = pConsumer->consumerId,
L
Liu Jicong 已提交
329
    };
L
Liu Jicong 已提交
330
    strcpy(req.cgroup, pConsumer->cgroup);
L
Liu Jicong 已提交
331
    strcpy(req.topicName, pTopic->name);
L
Liu Jicong 已提交
332 333 334
    req.sql = pTopic->sql;
    req.logicalPlan = pTopic->logicalPlan;
    req.physicalPlan = pTopic->physicalPlan;
L
Liu Jicong 已提交
335
    req.qmsg = pCEp->qmsg;
L
Liu Jicong 已提交
336
    int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
L
Liu Jicong 已提交
337 338
    void   *buf = malloc(sizeof(SMsgHead) + tlen);
    if (buf == NULL) {
L
Liu Jicong 已提交
339 340 341
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
342

L
Liu Jicong 已提交
343
    SMsgHead *pMsgHead = (SMsgHead *)buf;
L
Liu Jicong 已提交
344 345 346 347

    pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
    pMsgHead->vgId = htonl(vgId);

L
Liu Jicong 已提交
348
    void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
349
    tEncodeSMqSetCVgReq(&abuf, &req);
L
Liu Jicong 已提交
350 351 352

    STransAction action = {0};
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
L
Liu Jicong 已提交
353
    action.pCont = buf;
L
Liu Jicong 已提交
354
    action.contLen = sizeof(SMsgHead) + tlen;
L
Liu Jicong 已提交
355 356 357 358
    action.msgType = TDMT_VND_MQ_SET_CONN;

    mndReleaseVgroup(pMnode, pVgObj);
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
L
Liu Jicong 已提交
359
      free(buf);
L
Liu Jicong 已提交
360 361 362 363 364 365 366 367 368
      return -1;
    }
  }
  return 0;
}

void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
369
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
370
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
371
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
372 373 374 375 376

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

  void *buf = malloc(tlen);
L
Liu Jicong 已提交
377
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379 380
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
381 382 383 384 385 386 387

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
388 389
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
SUB_ENCODE_OVER:
  if (terrno != 0) {
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
421
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
422
  void *buf = malloc(tlen + 1);
L
Liu Jicong 已提交
423 424 425 426 427 428 429 430
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
431 432
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
433
SUB_DECODE_OVER:
L
Liu Jicong 已提交
434
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
435 436
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
    // TODO free subscribeobj
L
Liu Jicong 已提交
437
    tfree(buf);
L
Liu Jicong 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
    tfree(pRow);
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
  return 0;
}

static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
  return 0;
}

static char *mndMakeSubscribeKey(char *cgroup, char *topicName) {
  char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
  if (key == NULL) {
    return NULL;
  }
  int tlen = strlen(cgroup);
  memcpy(key, cgroup, tlen);
  key[tlen] = ':';
  strcpy(key + tlen + 1, topicName);
  return key;
}

SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) {
  SSdb            *pSdb = pMnode->pSdb;
  char            *key = mndMakeSubscribeKey(cgroup, topicName);
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  free(key);
  if (pSub == NULL) {
    /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
  }
  return pSub;
}

void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
  SMnode         *pMnode = pMsg->pMnode;
  char           *msgStr = pMsg->rpcMsg.pCont;
  SCMSubscribeReq subscribe;
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t consumerId = subscribe.consumerId;
  char   *consumerGroup = subscribe.consumerGroup;
  int32_t cgroupLen = strlen(consumerGroup);

  SArray *newSub = subscribe.topicNames;
  int     newTopicNum = subscribe.topicNum;

  taosArraySortString(newSub, taosArrayCompareString);

  SArray *oldSub = NULL;
  int     oldTopicNum = 0;
  // create consumer if not exist
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumer == NULL) {
    // create consumer
    pConsumer = malloc(sizeof(SMqConsumerObj));
    if (pConsumer == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
517
    pConsumer->epoch = 1;
L
Liu Jicong 已提交
518 519 520 521
    pConsumer->consumerId = consumerId;
    strcpy(pConsumer->cgroup, consumerGroup);
    taosInitRWLatch(&pConsumer->lock);
  } else {
L
Liu Jicong 已提交
522
    pConsumer->epoch++;
L
Liu Jicong 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
    oldSub = pConsumer->topics;
  }
  pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));

  if (oldSub != NULL) {
    oldTopicNum = taosArrayGetSize(oldSub);
  }

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
  if (pTrans == NULL) {
    // TODO: free memory
    return -1;
  }

  int i = 0, j = 0;
  while (i < newTopicNum || j < oldTopicNum) {
L
Liu Jicong 已提交
539 540
    char *newTopicName = NULL;
    char *oldTopicName = NULL;
L
Liu Jicong 已提交
541 542
    if (i >= newTopicNum) {
      // encode unset topic msg to all vnodes related to that topic
L
Liu Jicong 已提交
543
      oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
L
Liu Jicong 已提交
544 545
      j++;
    } else if (j >= oldTopicNum) {
L
Liu Jicong 已提交
546
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
547 548
      i++;
    } else {
L
Liu Jicong 已提交
549
      newTopicName = taosArrayGetP(newSub, i);
L
Liu Jicong 已提交
550
      oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
L
Liu Jicong 已提交
551

L
Liu Jicong 已提交
552
      int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
L
Liu Jicong 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
      if (comp == 0) {
        // do nothing
        oldTopicName = newTopicName = NULL;
        i++;
        j++;
        continue;
      } else if (comp < 0) {
        oldTopicName = NULL;
        i++;
      } else {
        newTopicName = NULL;
        j++;
      }
    }

    if (oldTopicName != NULL) {
#if 0
      // cancel subscribe of that old topic
      ASSERT(pNewTopic == NULL);
      char     *oldTopicName = pOldTopic->name;
      SList    *vgroups = pOldTopic->vgroups;
      SListIter iter;
      tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
      SListNode *pn;

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
      ASSERT(pTopic != NULL);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName);
      SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
      while ((pn = tdListNext(&iter)) != NULL) {
        int32_t vgId = *(int64_t *)pn->data;
        // acquire and get epset
        SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
        // TODO what time to release?
        if (pVgObj == NULL) {
          // TODO handle error
          continue;
        }
        // build reset msg
        void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
        // TODO:serialize
        if (pMsg == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
        STransAction action = {0};
        action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        action.pCont = pMqVgSetReq;
        action.contLen = 0;  // TODO
        action.msgType = TDMT_VND_MQ_SET_CONN;
        if (mndTransAppendRedoAction(pTrans, &action) != 0) {
          free(pMqVgSetReq);
          mndTransDrop(pTrans);
          // TODO free
          return -1;
        }
      }
      // delete data in mnode
      taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
      mndReleaseSubscribe(pMnode, pSub);
      mndReleaseTopic(pMnode, pTopic);
#endif
    } else if (newTopicName != NULL) {
      // save subscribe info to mnode
      ASSERT(oldTopicName == NULL);

      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
      if (pTopic == NULL) {
L
Liu Jicong 已提交
621
        mError("topic being subscribed not exist: %s", newTopicName);
L
Liu Jicong 已提交
622 623 624 625
        continue;
      }

      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
L
Liu Jicong 已提交
626
      bool             create = false;
L
Liu Jicong 已提交
627
      if (pSub == NULL) {
L
Liu Jicong 已提交
628
        mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
L
Liu Jicong 已提交
629 630 631 632 633
        pSub = tNewSubscribeObj();
        if (pSub == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
L
Liu Jicong 已提交
634
        char *key = mndMakeSubscribeKey(consumerGroup, newTopicName);
L
Liu Jicong 已提交
635 636 637 638
        if (key == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
L
Liu Jicong 已提交
639
        strcpy(pSub->key, key);
L
Liu Jicong 已提交
640
        free(key);
L
Liu Jicong 已提交
641
        // set unassigned vg
L
Liu Jicong 已提交
642 643 644 645
        if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
          //TODO: free memory
          return -1;
        }
L
Liu Jicong 已提交
646
        // TODO: disable alter
L
Liu Jicong 已提交
647
        create = true;
L
Liu Jicong 已提交
648
      }
L
Liu Jicong 已提交
649 650
      taosArrayPush(pSub->availConsumer, &consumerId);

L
Liu Jicong 已提交
651 652 653 654
      SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
      taosArrayPush(pConsumer->topics, pConsumerTopic);

      if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
L
Liu Jicong 已提交
655
        ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
L
Liu Jicong 已提交
656 657
        int32_t        vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
        SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned);
L
fix tq  
Liu Jicong 已提交
658 659 660 661 662
        if (pCEp->vgId == vgId) {
          if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
            // TODO
            return -1;
          }
L
Liu Jicong 已提交
663
        }
L
fix tq  
Liu Jicong 已提交
664
        // send setmsg to vnode
L
Liu Jicong 已提交
665
      }
L
Liu Jicong 已提交
666

L
Liu Jicong 已提交
667
      SSdbRaw *pRaw = mndSubActionEncode(pSub);
L
Liu Jicong 已提交
668
      sdbSetRawStatus(pRaw, SDB_STATUS_READY);
L
Liu Jicong 已提交
669
      mndTransAppendRedolog(pTrans, pRaw);
L
Liu Jicong 已提交
670
      if (!create) mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
#if 0
      SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
      if (pGroup == NULL) {
        // add new group
        pGroup = malloc(sizeof(SMqCGroup));
        if (pGroup == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
        pGroup->consumerIds = tdListNew(sizeof(int64_t));
        if (pGroup->consumerIds == NULL) {
          free(pGroup);
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
        pGroup->status = 0;
        // add into cgroups
        taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
      }
      /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/

      // put the consumer into list
      // rebalance will be triggered by timer
      tdListAppend(pGroup->consumerIds, &consumerId);

      SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
      sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
      // TODO: error handling
      mndTransAppendRedolog(pTrans, pTopicRaw);

#endif
L
Liu Jicong 已提交
702 703
      /*mndReleaseTopic(pMnode, pTopic);*/
      /*mndReleaseSubscribe(pMnode, pSub);*/
L
Liu Jicong 已提交
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
    }
  }
  // part3. persist consumerObj

  // destroy old sub
  if (oldSub) taosArrayDestroy(oldSub);
  // put new sub into consumerobj

  // persist consumerObj
  SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
  sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
  // TODO: error handling
  mndTransAppendRedolog(pTrans, pConsumerRaw);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
720
    if (newSub) taosArrayDestroy(newSub);
L
Liu Jicong 已提交
721
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
722
    /*mndReleaseConsumer(pMnode, pConsumer);*/
L
Liu Jicong 已提交
723 724 725 726 727
    return -1;
  }

  if (newSub) taosArrayDestroy(newSub);
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
728 729
  /*mndReleaseConsumer(pMnode, pConsumer);*/
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
730 731
}

L
Liu Jicong 已提交
732 733 734 735
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
  return 0;
}
L
Liu Jicong 已提交


static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
  SMnode        *pMnode = pMsg->pMnode;
  STableInfoReq *pInfo = pMsg->rpcMsg.pCont;

  mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);

#if 0
  SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
  if (pConsumer == NULL) {
    mndReleaseDb(pMnode, pDb);
    terrno = TSDB_CODE_MND_INVALID_CONSUMER;
    mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  taosRLockLatch(&pConsumer->lock);
  int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
  int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);

  STableMetaRsp *pMeta = rpcMallocCont(contLen);
  if (pMeta == NULL) {
    taosRUnLockLatch(&pConsumer->lock);
    mndReleaseDb(pMnode, pDb);
    mndReleaseConsumer(pMnode, pConsumer);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
  pMeta->numOfTags = htonl(pConsumer->numOfTags);
  pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
  pMeta->precision = pDb->cfg.precision;
  pMeta->tableType = TSDB_SUPER_TABLE;
  pMeta->update = pDb->cfg.update;
  pMeta->sversion = htonl(pConsumer->version);
  pMeta->tuid = htonl(pConsumer->uid);

  for (int32_t i = 0; i < totalCols; ++i) {
    SSchema *pSchema = &pMeta->pSchema[i];
    SSchema *pSrcSchema = &pConsumer->pSchema[i];
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
    pSchema->type = pSrcSchema->type;
    pSchema->colId = htonl(pSrcSchema->colId);
    pSchema->bytes = htonl(pSrcSchema->bytes);
  }
  taosRUnLockLatch(&pConsumer->lock);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConsumer(pMnode, pConsumer);

  pMsg->pCont = pMeta;
  pMsg->contLen = contLen;

  mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
  return 0;
}

static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
  SSdb *pSdb = pMnode->pSdb;

  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfConsumers = 0;
  void   *pIter = NULL;
  while (1) {
    SMqConsumerObj *pConsumer = NULL;
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    numOfConsumers++;

    sdbRelease(pSdb, pConsumer);
  }

  *pNumOfConsumers = numOfConsumers;
  return 0;
}

static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pMsg->pMnode;
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
  SSchema *pSchema = pMeta->pSchema;

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "columns");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tags");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pMeta->numOfColumns = htonl(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));

  return 0;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}