mndTopic.c 20.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
L
Liu Jicong 已提交
23 24 25
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
26
#include "parser.h"
L
Liu Jicong 已提交
27 28
#include "tname.h"

L
Liu Jicong 已提交
29
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
30
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
L
Liu Jicong 已提交
38

L
Liu Jicong 已提交
39
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
40
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
41

L
Liu Jicong 已提交
42 43
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52
int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
53 54
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
55
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
58
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
59

L
Liu Jicong 已提交
60 61 62 63 64
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
65 66 67 68 69
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
70
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
71 72
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
fix  
Liu Jicong 已提交
73
  int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
L
Liu Jicong 已提交
74
  int32_t schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
fix  
Liu Jicong 已提交
75
  int32_t size =
L
Liu Jicong 已提交
76
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
77
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
78
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
79 80

  int32_t dataPos = 0;
L
Liu Jicong 已提交
81
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
82 83 84 85 86 87
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
88 89 90 91
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
92 93

  SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
94 95
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
96 97
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
98
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
99
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
  void *swBuf = taosMemoryMalloc(schemaLen);
L
Liu Jicong 已提交
102 103 104 105 106
  if (swBuf == NULL) {
    goto TOPIC_ENCODE_OVER;
  }
  void *aswBuf = swBuf;
  taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
L
Liu Jicong 已提交
107 108
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
109

L
Liu Jicong 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
123 124 125
  return pRaw;
}

L
Liu Jicong 已提交
126
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
127
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
128
  int8_t sver = 0;
129
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
130

S
Shengliang Guan 已提交
131
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
132
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
133
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
134 135
  }

L
Liu Jicong 已提交
136 137
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
138 139
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
140
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
141
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
142

L
Liu Jicong 已提交
143
  int32_t len;
L
Liu Jicong 已提交
144
  int32_t dataPos = 0;
L
Liu Jicong 已提交
145
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
146 147 148 149 150 151
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
152 153 154 155
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
156

L
Liu Jicong 已提交
157 158
  SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);

L
Liu Jicong 已提交
159 160 161 162 163 164
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
165
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
166

L
Liu Jicong 已提交
167 168 169
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
  pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
  if (pTopic->ast == NULL) {
170 171 172
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
173
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
174
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
175
  pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
176 177 178 179
  if (pTopic->physicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
180
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
181

L
Liu Jicong 已提交
182 183 184 185 186 187 188 189 190 191 192
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
  void *buf = taosMemoryMalloc(len);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
  SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
  if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
    goto TOPIC_DECODE_OVER;
  }

L
Liu Jicong 已提交
193
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
194

L
Liu Jicong 已提交
195
  terrno = TSDB_CODE_SUCCESS;
196 197

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
198
  if (terrno != TSDB_CODE_SUCCESS) {
199
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
200
    taosMemoryFreeClear(pRow);
201 202
    return NULL;
  }
L
Liu Jicong 已提交
203

204
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
205 206 207
  return pRow;
}

L
Liu Jicong 已提交
208
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
209 210 211 212
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
213
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
214 215 216 217
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
218
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
219
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
220
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
221 222 223
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
224 225

  // TODO handle update
L
Liu Jicong 已提交
226 227 228 229 230

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
231 232 233
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
234
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
235 236 237 238 239
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
240
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
241 242 243 244
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

L
Liu Jicong 已提交
245
#if 0
L
Liu Jicong 已提交
246 247
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
248
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
251 252 253 254
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}
L
Liu Jicong 已提交
255
#endif
L
Liu Jicong 已提交
256

S
Shengliang Guan 已提交
257 258
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
259

wafwerar's avatar
wafwerar 已提交
260
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
261 262 263 264 265 266 267
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
268
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
269 270 271 272 273
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
274
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
275
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
L
Liu Jicong 已提交
276
    terrno = TSDB_CODE_MND_INVALID_TOPIC;
S
Shengliang Guan 已提交
277 278
    return -1;
  }
279

L
Liu Jicong 已提交
280
  return 0;
281 282
}

S
Shengliang Guan 已提交
283
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
284
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
285
  SMqTopicObj topicObj = {0};
286
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
287
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
288 289
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
290
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
291 292
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
293 294 295
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;

L
Liu Jicong 已提交
296
  if (pCreate->ast && pCreate->ast[0]) {
L
Liu Jicong 已提交
297 298
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
299
    topicObj.subType = TOPIC_SUB_TYPE__TABLE;
L
Liu Jicong 已提交
300 301
    topicObj.withTbName = pCreate->withTbName;
    topicObj.withSchema = pCreate->withSchema;
L
Liu Jicong 已提交
302

L
Liu Jicong 已提交
303 304
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
305 306
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
307 308 309
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
310

L
Liu Jicong 已提交
311
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
312

L
Liu Jicong 已提交
313 314 315
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
316 317
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
318 319 320 321 322
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
323 324
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
325 326 327 328 329
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
330 331
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
332 333 334
      return -1;
    }
  } else {
L
Liu Jicong 已提交
335 336 337
    topicObj.ast = strdup("");
    topicObj.astLen = 1;
    topicObj.physicalPlan = strdup("");
L
Liu Jicong 已提交
338 339 340
    topicObj.subType = TOPIC_SUB_TYPE__DB;
    topicObj.withTbName = 1;
    topicObj.withSchema = 1;
L
Liu Jicong 已提交
341 342
  }

S
Shengliang Guan 已提交
343
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
344 345
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
346 347
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
348
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
349 350 351 352 353 354 355
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
356
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
357 358 359 360 361 362 363
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
364
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
365 366 367 368
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
369
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
370 371
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
372 373
}

S
Shengliang Guan 已提交
374 375
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
  SMnode           *pMnode = pReq->pNode;
L
Liu Jicong 已提交
376 377 378 379 380 381 382
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

  if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
383 384 385
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
386 387

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
388

S
Shengliang Guan 已提交
389
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
390
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
391
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
392 393
  }

S
Shengliang Guan 已提交
394
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
395
  if (pTopic != NULL) {
396 397
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
398 399
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
400 401
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
402
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
403
    }
S
Shengliang Guan 已提交
404 405
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
406 407
  }

L
Liu Jicong 已提交
408
  pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
L
Liu Jicong 已提交
409 410
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
428
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
429 430
  }

S
Shengliang Guan 已提交
431
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
432
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
433
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
434

L
Liu Jicong 已提交
435
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
436 437 438
  return code;
}

S
Shengliang Guan 已提交
439
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
440
  // TODO: cannot drop when subscribed
S
Shengliang Guan 已提交
441
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
459 460 461
    return -1;
  }

S
Shengliang Guan 已提交
462
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
463
  return 0;
L
Liu Jicong 已提交
464 465
}

S
Shengliang Guan 已提交
466 467
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
468
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
469

S
Shengliang Guan 已提交
470 471 472 473 474 475
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
476

S
Shengliang Guan 已提交
477
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
478
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
479 480
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
481 482 483
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
484
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
485 486 487 488
      return -1;
    }
  }

S
Shengliang Guan 已提交
489
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
490 491 492 493
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
494
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
495 496 497 498 499 500
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
501
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
502
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
503 504 505
  return 0;
}

L
Liu Jicong 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

L
Liu Jicong 已提交
533
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
534
  SMnode      *pMnode = pReq->pNode;
L
Liu Jicong 已提交
535 536 537
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
538

L
Liu Jicong 已提交
539
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
540 541 542
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
543 544 545
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
546

L
Liu Jicong 已提交
547
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
548
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
549 550
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
551
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
552
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
553

L
Liu Jicong 已提交
554 555 556 557
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
558
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
559
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
560

L
Liu Jicong 已提交
561
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
562 563
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
564 565
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
566
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
567
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
568
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
569 570 571 572 573

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

574
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
575 576 577
  return numOfRows;
}

L
Liu Jicong 已提交
578 579 580 581 582 583 584 585 586
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
587 588 589 590
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}