mndTopic.c 22.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
21
#include "mndOffset.h"
L
Liu Jicong 已提交
22
#include "mndShow.h"
S
Shengliang Guan 已提交
23
#include "mndStb.h"
L
Liu Jicong 已提交
24
#include "mndSubscribe.h"
L
Liu Jicong 已提交
25 26 27
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
28
#include "parser.h"
L
Liu Jicong 已提交
29 30
#include "tname.h"

L
Liu Jicong 已提交
31
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
32
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34 35 36
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
37 38 39
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
L
Liu Jicong 已提交
40

S
Shengliang Guan 已提交
41
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44 45
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
46 47 48 49 50 51 52 53 54
int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
55 56
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
57
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
60
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
61

L
Liu Jicong 已提交
62 63 64 65 66
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
67 68 69 70 71
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
72
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
73 74
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
75 76 77 78 79 80 81
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
82
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
83
  }
L
fix  
Liu Jicong 已提交
84
  int32_t size =
L
Liu Jicong 已提交
85
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
86
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
87
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
88 89

  int32_t dataPos = 0;
L
Liu Jicong 已提交
90
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
91 92 93 94 95 96
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
97 98 99 100
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
101 102

  SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
103 104
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
105
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
106 107 108
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
109
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
110 111
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
112
  }
L
Liu Jicong 已提交
113
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
114 115 116 117 118 119 120 121 122 123
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }

L
Liu Jicong 已提交
124
  SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
125

L
Liu Jicong 已提交
126 127 128 129 130 131
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
132
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
133 134 135 136 137 138 139
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
140 141 142
  return pRaw;
}

L
Liu Jicong 已提交
143
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
144
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
145
  int8_t sver = 0;
146
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
147

S
Shengliang Guan 已提交
148
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
149
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
150
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
151 152
  }

L
Liu Jicong 已提交
153 154
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
155 156
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
157
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
158
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
159

L
Liu Jicong 已提交
160
  int32_t len;
L
Liu Jicong 已提交
161
  int32_t dataPos = 0;
L
Liu Jicong 已提交
162
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
163 164 165 166 167 168
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
169 170 171 172
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
173

L
Liu Jicong 已提交
174 175
  SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);

L
Liu Jicong 已提交
176 177 178 179 180 181
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
182
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
183

L
Liu Jicong 已提交
184
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
193
  }
L
Liu Jicong 已提交
194
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
195
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
205
  }
206

L
Liu Jicong 已提交
207
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
    pTopic->schema.sver = 0;
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
222 223
  }

L
Liu Jicong 已提交
224 225
  SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);

L
Liu Jicong 已提交
226
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
227

L
Liu Jicong 已提交
228
  terrno = TSDB_CODE_SUCCESS;
229 230

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
231
  if (terrno != TSDB_CODE_SUCCESS) {
232
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
233
    taosMemoryFreeClear(pRow);
234 235
    return NULL;
  }
L
Liu Jicong 已提交
236

237
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
238 239 240
  return pRow;
}

L
Liu Jicong 已提交
241
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
242 243 244 245
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
246
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
247 248 249 250
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
251
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
252
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
253
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
254 255
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
256 257 258
  atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);

  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
259 260

  // TODO handle update
L
Liu Jicong 已提交
261

L
Liu Jicong 已提交
262
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
263 264 265
  return 0;
}

L
Liu Jicong 已提交
266 267 268
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
269
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
270 271 272 273 274
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
275
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
276 277 278 279
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

L
Liu Jicong 已提交
280
#if 0
L
Liu Jicong 已提交
281 282
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
283
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
284

L
Liu Jicong 已提交
285
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
286 287 288 289
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}
L
Liu Jicong 已提交
290
#endif
L
Liu Jicong 已提交
291

S
Shengliang Guan 已提交
292 293
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
294

wafwerar's avatar
wafwerar 已提交
295
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
296 297 298 299 300 301 302
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
303
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
304 305 306 307 308
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
309
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
310
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
L
Liu Jicong 已提交
311
    terrno = TSDB_CODE_MND_INVALID_TOPIC;
S
Shengliang Guan 已提交
312 313
    return -1;
  }
314

L
Liu Jicong 已提交
315
  return 0;
316 317
}

S
Shengliang Guan 已提交
318
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
319
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
320
  SMqTopicObj topicObj = {0};
321
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
322
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
323 324
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
325
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
326 327
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
328 329
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
330
  topicObj.refConsumerCnt = 0;
L
Liu Jicong 已提交
331

L
Liu Jicong 已提交
332
  if (pCreate->ast && pCreate->ast[0]) {
L
Liu Jicong 已提交
333 334
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
335
    topicObj.subType = TOPIC_SUB_TYPE__TABLE;
L
Liu Jicong 已提交
336 337
    topicObj.withTbName = pCreate->withTbName;
    topicObj.withSchema = pCreate->withSchema;
L
Liu Jicong 已提交
338

L
Liu Jicong 已提交
339 340
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
341 342
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
343 344 345
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
348

L
Liu Jicong 已提交
349 350 351
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
352 353
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
354 355 356 357 358
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
359 360
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
361 362 363 364 365
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
366 367
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
368 369 370
      return -1;
    }
  } else {
L
Liu Jicong 已提交
371 372 373
    topicObj.ast = NULL;
    topicObj.astLen = 0;
    topicObj.physicalPlan = NULL;
L
Liu Jicong 已提交
374 375 376
    topicObj.subType = TOPIC_SUB_TYPE__DB;
    topicObj.withTbName = 1;
    topicObj.withSchema = 1;
L
Liu Jicong 已提交
377 378
  }

S
Shengliang Guan 已提交
379
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
S
Shengliang Guan 已提交
380 381
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
382 383
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
384
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
385 386 387 388 389 390 391
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
392
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
393 394 395 396 397 398 399
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
400
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
401 402 403 404
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
405
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
406 407
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
408 409
}

S
Shengliang Guan 已提交
410 411
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
412 413 414 415 416 417
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
418
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
419 420 421
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
422 423

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
424

S
Shengliang Guan 已提交
425
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
426
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
427
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
428 429
  }

S
Shengliang Guan 已提交
430
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
431
  if (pTopic != NULL) {
432 433
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
434 435
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
436 437
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
438
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
439
    }
S
Shengliang Guan 已提交
440 441
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
442 443
  }

L
Liu Jicong 已提交
444
  pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
L
Liu Jicong 已提交
445 446
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
447 448 449
    goto CREATE_TOPIC_OVER;
  }

S
Shengliang Guan 已提交
450
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
464
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
465 466
  }

S
Shengliang Guan 已提交
467
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
468
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
469
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
470

L
Liu Jicong 已提交
471
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
472 473 474
  return code;
}

475
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
S
Shengliang Guan 已提交
476 477 478 479 480 481 482 483 484 485 486
  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
487 488 489
    return -1;
  }

S
Shengliang Guan 已提交
490
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
491
  return 0;
L
Liu Jicong 已提交
492 493
}

S
Shengliang Guan 已提交
494
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
495 496
  SMnode *pMnode = pReq->info.node;
  /*SSdb          *pSdb = pMnode->pSdb;*/
S
Shengliang Guan 已提交
497
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
498

S
Shengliang Guan 已提交
499
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
500
    terrno = TSDB_CODE_INVALID_MSG;
501
    return -1;
L
Liu Jicong 已提交
502 503
  }

504
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
505 506 507 508 509 510 511 512 513 514
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
515

L
Liu Jicong 已提交
516
  if (pTopic->refConsumerCnt != 0) {
517
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
518 519
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
520 521 522
    return -1;
  }

523
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
L
Liu Jicong 已提交
524 525 526 527
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
528

L
Liu Jicong 已提交
529 530 531 532 533
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
L
Liu Jicong 已提交
534 535
  }

L
Liu Jicong 已提交
536 537 538 539 540 541
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
542 543 544 545
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
546
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
547 548 549 550 551 552
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
553
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
554
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
555 556 557
  return 0;
}

L
Liu Jicong 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
585 586
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
587 588 589
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
590

L
Liu Jicong 已提交
591
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
592 593 594
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
595 596 597
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
598

L
Liu Jicong 已提交
599
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
600
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
601 602
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
603
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
604
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
605

L
Liu Jicong 已提交
606 607 608 609
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
611
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
612

L
Liu Jicong 已提交
613
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
614 615
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
616 617
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
618
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
619
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
620
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
621 622 623 624 625

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

626
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
627 628 629
  return numOfRows;
}

L
Liu Jicong 已提交
630 631 632 633 634 635 636 637 638
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
639 640 641 642 643 644 645 646 647
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
648 649 650 651
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}