mndTopic.c 18.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
L
Liu Jicong 已提交
23 24 25
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
26
#include "parser.h"
L
Liu Jicong 已提交
27 28
#include "tname.h"

L
Liu Jicong 已提交
29
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
30
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37 38
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
L
Liu Jicong 已提交
39
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
40 41 42 43 44 45 46 47 48 49

int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
50 51
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
52
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
53

54
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
55
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
56

L
Liu Jicong 已提交
57 58 59 60 61
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
62
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
63 64
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
fix  
Liu Jicong 已提交
65
  int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
L
Liu Jicong 已提交
66
  int32_t schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
fix  
Liu Jicong 已提交
67
  int32_t size =
L
Liu Jicong 已提交
68
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
69
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
70
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
71 72

  int32_t dataPos = 0;
L
Liu Jicong 已提交
73
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
74 75 76 77 78 79 80 81
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
82 83
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
84
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
85
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
  void *swBuf = taosMemoryMalloc(schemaLen);
L
Liu Jicong 已提交
88 89 90 91 92
  if (swBuf == NULL) {
    goto TOPIC_ENCODE_OVER;
  }
  void *aswBuf = swBuf;
  taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
L
Liu Jicong 已提交
93 94
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
95

L
Liu Jicong 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
109 110 111
  return pRaw;
}

L
Liu Jicong 已提交
112
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
113
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
114
  int8_t sver = 0;
115
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
116

S
Shengliang Guan 已提交
117
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
118
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
119
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
120 121
  }

L
Liu Jicong 已提交
122 123
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
124 125
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
126
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
127
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
128

L
Liu Jicong 已提交
129
  int32_t len;
L
Liu Jicong 已提交
130
  int32_t dataPos = 0;
L
Liu Jicong 已提交
131
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
132 133 134 135 136 137
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
138

L
Liu Jicong 已提交
139 140 141 142 143 144
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
145
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
146

L
Liu Jicong 已提交
147 148 149
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
  pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
  if (pTopic->ast == NULL) {
150 151 152
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
153
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
154
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
155
  pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
156 157 158 159
  if (pTopic->physicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
160
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
161

L
Liu Jicong 已提交
162 163 164 165 166 167 168 169 170 171 172
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
  void *buf = taosMemoryMalloc(len);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
  SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
  if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
    goto TOPIC_DECODE_OVER;
  }

L
Liu Jicong 已提交
173
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
174

L
Liu Jicong 已提交
175
  terrno = TSDB_CODE_SUCCESS;
176 177

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
178
  if (terrno != TSDB_CODE_SUCCESS) {
179
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
180
    taosMemoryFreeClear(pRow);
181 182
    return NULL;
  }
L
Liu Jicong 已提交
183

184
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
185 186 187
  return pRow;
}

L
Liu Jicong 已提交
188
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
189 190 191 192
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
193
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
194 195 196 197
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
198
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
199
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
200
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
201 202 203
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
204 205

  // TODO handle update
L
Liu Jicong 已提交
206 207 208 209 210

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
211 212 213
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
214
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
215 216 217 218 219
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
220
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
221 222 223 224 225 226
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
227
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
230 231 232 233 234
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}

S
Shengliang Guan 已提交
235 236
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
237

wafwerar's avatar
wafwerar 已提交
238
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
239 240 241 242 243 244 245
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
246
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
247 248 249 250 251
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
252
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
S
Shengliang Guan 已提交
253 254 255 256
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
    return -1;
  }
L
Liu Jicong 已提交
257 258 259
  return 0;
}

L
Liu Jicong 已提交
260
#if 0
L
Liu Jicong 已提交
261
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
262 263 264 265
  if (NULL == pCreate->ast) {
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
266
  SNode  *pAst = NULL;
267 268
  int32_t code = nodesStringToNode(pCreate->ast, &pAst);

L
Liu Jicong 已提交
269
  SQueryPlan *pPlan = NULL;
270
  if (TSDB_CODE_SUCCESS == code) {
L
Liu Jicong 已提交
271
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
272 273 274 275 276 277 278 279 280 281 282
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesNodeToString(pPlan, false, pStr, NULL);
  }
  nodesDestroyNode(pAst);
  nodesDestroyNode(pPlan);
  terrno = code;
  return code;
}
L
Liu Jicong 已提交
283
#endif
284

S
Shengliang Guan 已提交
285
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
286
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
287
  SMqTopicObj topicObj = {0};
288
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
289
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
290 291
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
292
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
293 294
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
295 296 297 298 299 300 301 302
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
  topicObj.ast = strdup(pCreate->ast);
  topicObj.astLen = strlen(pCreate->ast) + 1;

  SNode *pAst = NULL;
  if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
303 304 305
    return -1;
  }

L
Liu Jicong 已提交
306 307 308 309 310
  SQueryPlan *pPlan = NULL;

  SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
311 312
    return -1;
  }
L
Liu Jicong 已提交
313

L
Liu Jicong 已提交
314
  if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
L
Liu Jicong 已提交
315 316 317 318 319 320
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
321 322 323
    return -1;
  }

S
Shengliang Guan 已提交
324
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
325 326
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
327
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
328 329 330 331 332 333 334
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
335
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
336 337 338 339 340 341 342
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
343
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
344 345 346 347
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
348
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
349 350
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
351 352
}

S
Shengliang Guan 已提交
353 354
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
  SMnode           *pMnode = pReq->pNode;
L
Liu Jicong 已提交
355 356 357 358 359 360 361
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

  if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
362 363 364
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
365 366

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
367

S
Shengliang Guan 已提交
368
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
369
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
370
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
371 372
  }

S
Shengliang Guan 已提交
373
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
374
  if (pTopic != NULL) {
375 376
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
377 378
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
379 380
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
381
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
382
    }
S
Shengliang Guan 已提交
383 384
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
385 386
  }

S
Shengliang Guan 已提交
387
  pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
388 389
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
407
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
408 409
  }

S
Shengliang Guan 已提交
410
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
411
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
412
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
413

L
Liu Jicong 已提交
414
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
415 416 417
  return code;
}

S
Shengliang Guan 已提交
418
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
419
  // TODO: cannot drop when subscribed
S
Shengliang Guan 已提交
420
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
438 439 440
    return -1;
  }

S
Shengliang Guan 已提交
441
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
442
  return 0;
L
Liu Jicong 已提交
443 444
}

S
Shengliang Guan 已提交
445 446
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
447
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
448

S
Shengliang Guan 已提交
449 450 451 452 453 454
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
455

S
Shengliang Guan 已提交
456
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
457
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
458 459
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
460 461 462
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
463
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
464 465 466 467
      return -1;
    }
  }

S
Shengliang Guan 已提交
468
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
469 470 471 472
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
473
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
474 475 476 477 478 479
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
480
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
481
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
482 483 484
  return 0;
}

L
Liu Jicong 已提交
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
512 513
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode      *pMnode = pReq->pNode;
L
Liu Jicong 已提交
514 515 516 517 518
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
  int32_t      cols = 0;
  char        *pWrite;
S
Shengliang Guan 已提交
519 520 521 522
  char         prefix[TSDB_DB_FNAME_LEN] = {0};

  SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
  if (pDb == NULL) return 0;
L
Liu Jicong 已提交
523

S
Shengliang Guan 已提交
524
  tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
525 526 527 528 529 530 531
  strcat(prefix, TS_PATH_DELIMITER);
  int32_t prefixLen = (int32_t)strlen(prefix);

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
532 533
    if (pTopic->dbUid != pDb->uid) {
      if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
L
Liu Jicong 已提交
534
        mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
S
Shengliang Guan 已提交
535 536
      }

L
Liu Jicong 已提交
537 538 539 540 541 542
      sdbRelease(pSdb, pTopic);
      continue;
    }

    cols = 0;

L
Liu Jicong 已提交
543 544
    char topicName[TSDB_TOPIC_NAME_LEN] = {0};
    tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
L
Liu Jicong 已提交
545 546 547 548 549 550 551 552
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, topicName);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pTopic->createTime;
    cols++;

S
Shengliang Guan 已提交
553 554 555 556
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
    cols++;

L
Liu Jicong 已提交
557 558 559 560
    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

S
Shengliang Guan 已提交
561
  mndReleaseDb(pMnode, pDb);
562
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
563 564 565 566 567 568 569
  return numOfRows;
}

static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}