mndTopic.c 19.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
L
Liu Jicong 已提交
23 24 25
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
26
#include "parser.h"
L
Liu Jicong 已提交
27 28
#include "tname.h"

L
Liu Jicong 已提交
29
#define MND_TOPIC_VER_NUMBER 1
S
Shengliang Guan 已提交
30
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37 38 39 40
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndProcessTopicMetaReq(SNodeMsg *pReq);
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42 43 44 45 46 47 48 49 50 51

int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
52 53
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
54
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
55

S
Shengliang Guan 已提交
56 57 58
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);

L
Liu Jicong 已提交
59 60 61 62 63
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
64
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
65 66
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
fix  
Liu Jicong 已提交
67 68 69 70 71
  int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
  int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
  int32_t size =
      sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + swLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
72
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
73
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
74 75

  int32_t dataPos = 0;
L
Liu Jicong 已提交
76
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
77 78 79 80 81 82 83 84
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
85
  SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
86
  SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
87
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
88
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
89

L
fix  
Liu Jicong 已提交
90
  void *swBuf = taosMemoryMalloc(swLen);
L
Liu Jicong 已提交
91 92 93 94 95 96 97 98
  if (swBuf == NULL) {
    goto TOPIC_ENCODE_OVER;
  }
  void *aswBuf = swBuf;
  taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
  SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER);

L
Liu Jicong 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
112 113 114
  return pRaw;
}

L
Liu Jicong 已提交
115
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
116
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
117
  int8_t sver = 0;
118
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
119

S
Shengliang Guan 已提交
120
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
121
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
122
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
123 124
  }

L
Liu Jicong 已提交
125 126
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
127 128
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
129
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
130
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
131

L
Liu Jicong 已提交
132
  int32_t len;
L
Liu Jicong 已提交
133
  int32_t dataPos = 0;
L
Liu Jicong 已提交
134
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
135 136 137 138 139 140 141
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
142

wafwerar's avatar
wafwerar 已提交
143
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen + 1, sizeof(char));
L
Liu Jicong 已提交
144
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
145 146

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
147
  pTopic->logicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
148 149 150 151
  if (pTopic->logicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
152
  SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
153 154

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
155
  pTopic->physicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
156
  if (pTopic->physicalPlan == NULL) {
wafwerar's avatar
wafwerar 已提交
157
    taosMemoryFree(pTopic->logicalPlan);
158 159 160
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
161
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
162

L
Liu Jicong 已提交
163 164 165 166 167 168 169 170 171 172 173
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
  void *buf = taosMemoryMalloc(len);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
  SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
  if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
    goto TOPIC_DECODE_OVER;
  }

L
Liu Jicong 已提交
174
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
175

L
Liu Jicong 已提交
176
  terrno = TSDB_CODE_SUCCESS;
177 178

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
179
  if (terrno != TSDB_CODE_SUCCESS) {
180
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
181
    taosMemoryFreeClear(pRow);
182 183
    return NULL;
  }
L
Liu Jicong 已提交
184

185
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
186 187 188
  return pRow;
}

L
Liu Jicong 已提交
189
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
190 191 192 193
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
194
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
195 196 197 198
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
199
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
200
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
201
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
202 203 204
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
205 206

  // TODO handle update
L
Liu Jicong 已提交
207 208 209 210 211

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
212 213 214
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
215
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
216 217 218 219 220
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
221
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
222 223 224 225 226 227
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
228
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
229

L
Liu Jicong 已提交
230
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
231 232 233 234 235
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}

S
Shengliang Guan 已提交
236 237
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
238

wafwerar's avatar
wafwerar 已提交
239
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
240 241 242 243 244 245 246
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
247
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
248 249 250 251 252
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
253
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
S
Shengliang Guan 已提交
254 255 256 257
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
    return -1;
  }
L
Liu Jicong 已提交
258 259 260
  return 0;
}

L
Liu Jicong 已提交
261
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
262 263 264 265
  if (NULL == pCreate->ast) {
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
266
  SNode  *pAst = NULL;
267 268
  int32_t code = nodesStringToNode(pCreate->ast, &pAst);

L
Liu Jicong 已提交
269
  SQueryPlan *pPlan = NULL;
270
  if (TSDB_CODE_SUCCESS == code) {
L
Liu Jicong 已提交
271
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
272 273 274 275 276 277 278 279 280 281 282 283
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesNodeToString(pPlan, false, pStr, NULL);
  }
  nodesDestroyNode(pAst);
  nodesDestroyNode(pPlan);
  terrno = code;
  return code;
}

S
Shengliang Guan 已提交
284
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
285
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
286
  SMqTopicObj topicObj = {0};
287
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
288
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
289 290
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
291
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
292 293
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
294
  topicObj.sql = pCreate->sql;
L
Liu Jicong 已提交
295
  topicObj.physicalPlan = "";
296
  topicObj.logicalPlan = "";
297
  topicObj.sqlLen = strlen(pCreate->sql);
L
Liu Jicong 已提交
298

L
Liu Jicong 已提交
299
  char *pPlanStr = NULL;
300
  if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) {
301 302 303
    mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
304 305 306
  if (NULL != pPlanStr) {
    topicObj.physicalPlan = pPlanStr;
  }
307

L
Liu Jicong 已提交
308 309 310 311 312 313 314 315
  SNode *pAst = NULL;
  if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
    return -1;
  }
  if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
316
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
317 318
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
wafwerar's avatar
wafwerar 已提交
319
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
320 321 322 323 324 325 326
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
327
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
328 329 330 331 332 333 334
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
335
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
336 337 338 339
    mndTransDrop(pTrans);
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
340
  taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
341 342
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
343 344
}

S
Shengliang Guan 已提交
345 346
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
  SMnode           *pMnode = pReq->pNode;
L
Liu Jicong 已提交
347 348 349 350 351 352 353
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

  if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
354 355 356
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
357 358

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
359

S
Shengliang Guan 已提交
360
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
361
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
362
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
363 364
  }

S
Shengliang Guan 已提交
365
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
366
  if (pTopic != NULL) {
367 368
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
369 370
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
371 372
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
373
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
374
    }
S
Shengliang Guan 已提交
375 376
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
377 378
  }

S
Shengliang Guan 已提交
379
  pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
380 381
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
399
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
400 401
  }

S
Shengliang Guan 已提交
402
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
403
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
404
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
405

L
Liu Jicong 已提交
406
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
407 408 409
  return code;
}

S
Shengliang Guan 已提交
410
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
411
  // TODO: cannot drop when subscribed
S
Shengliang Guan 已提交
412
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
430 431 432
    return -1;
  }

S
Shengliang Guan 已提交
433
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
434
  return 0;
L
Liu Jicong 已提交
435 436
}

S
Shengliang Guan 已提交
437 438
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
439
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
440

S
Shengliang Guan 已提交
441 442 443 444 445 446
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
447

S
Shengliang Guan 已提交
448
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
449
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
450 451
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
452 453 454
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
455
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
456 457 458 459
      return -1;
    }
  }

S
Shengliang Guan 已提交
460
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
461 462 463 464
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
465
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
466 467 468 469 470 471
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
472
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
473
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
474 475 476
  return 0;
}

L
Liu Jicong 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
504 505
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
L
Liu Jicong 已提交
506 507 508 509 510 511 512
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
S
Shengliang Guan 已提交
513
  SSchema *pSchema = pMeta->pSchemas;
L
Liu Jicong 已提交
514 515 516 517

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
S
Shengliang Guan 已提交
518
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
519 520 521 522 523
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
524
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
525 526
  cols++;

S
Shengliang Guan 已提交
527 528 529 530 531 532
  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
  pSchema[cols].bytes = pShow->bytes[cols];
  cols++;

S
Shengliang Guan 已提交
533
  pMeta->numOfColumns = cols;
L
Liu Jicong 已提交
534 535 536 537 538 539 540 541 542
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
543
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
L
Liu Jicong 已提交
544 545 546 547

  return 0;
}

S
Shengliang Guan 已提交
548 549
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode      *pMnode = pReq->pNode;
L
Liu Jicong 已提交
550 551 552 553 554
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
  int32_t      cols = 0;
  char        *pWrite;
S
Shengliang Guan 已提交
555 556 557 558
  char         prefix[TSDB_DB_FNAME_LEN] = {0};

  SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
  if (pDb == NULL) return 0;
L
Liu Jicong 已提交
559

S
Shengliang Guan 已提交
560
  tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
561 562 563 564 565 566 567
  strcat(prefix, TS_PATH_DELIMITER);
  int32_t prefixLen = (int32_t)strlen(prefix);

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
568 569
    if (pTopic->dbUid != pDb->uid) {
      if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
L
Liu Jicong 已提交
570
        mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
S
Shengliang Guan 已提交
571 572
      }

L
Liu Jicong 已提交
573 574 575 576 577 578
      sdbRelease(pSdb, pTopic);
      continue;
    }

    cols = 0;

L
Liu Jicong 已提交
579 580
    char topicName[TSDB_TOPIC_NAME_LEN] = {0};
    tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
L
Liu Jicong 已提交
581 582 583 584 585 586 587 588
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, topicName);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pTopic->createTime;
    cols++;

S
Shengliang Guan 已提交
589 590 591 592
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
    cols++;

L
Liu Jicong 已提交
593 594 595 596
    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

S
Shengliang Guan 已提交
597
  mndReleaseDb(pMnode, pDb);
L
Liu Jicong 已提交
598 599 600 601 602 603 604 605 606
  pShow->numOfReads += numOfRows;
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  return numOfRows;
}

static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}