mndTopic.c 19.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
L
Liu Jicong 已提交
23 24 25
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
26
#include "parser.h"
L
Liu Jicong 已提交
27 28
#include "tname.h"

L
Liu Jicong 已提交
29
#define MND_TOPIC_VER_NUMBER 1
S
Shengliang Guan 已提交
30
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37 38 39 40
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndProcessTopicMetaReq(SNodeMsg *pReq);
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42 43 44 45 46 47 48 49 50 51

int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
52 53
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
54
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
55

S
Shengliang Guan 已提交
56 57 58 59
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);

L
Liu Jicong 已提交
60 61 62 63 64
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
65
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
66 67
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
fix  
Liu Jicong 已提交
68 69 70 71 72
  int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
  int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
  int32_t size =
      sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + swLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
73
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
74
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
75 76

  int32_t dataPos = 0;
L
Liu Jicong 已提交
77
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
78 79 80 81 82 83 84 85
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
86
  SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
87
  SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
88
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
89
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
90

L
fix  
Liu Jicong 已提交
91
  void *swBuf = taosMemoryMalloc(swLen);
L
Liu Jicong 已提交
92 93 94 95 96 97 98 99
  if (swBuf == NULL) {
    goto TOPIC_ENCODE_OVER;
  }
  void *aswBuf = swBuf;
  taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
  SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER);

L
Liu Jicong 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
113 114 115
  return pRaw;
}

L
Liu Jicong 已提交
116
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
118
  int8_t sver = 0;
119
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
120

S
Shengliang Guan 已提交
121
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
122
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
123
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
124 125
  }

L
Liu Jicong 已提交
126 127
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
128 129
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
130
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
131
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133
  int32_t len;
L
Liu Jicong 已提交
134
  int32_t dataPos = 0;
L
Liu Jicong 已提交
135
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
136 137 138 139 140 141 142
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
143

wafwerar's avatar
wafwerar 已提交
144
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen + 1, sizeof(char));
L
Liu Jicong 已提交
145
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
146 147

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
148
  pTopic->logicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
149 150 151 152
  if (pTopic->logicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
153
  SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
154 155

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
156
  pTopic->physicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
157
  if (pTopic->physicalPlan == NULL) {
wafwerar's avatar
wafwerar 已提交
158
    taosMemoryFree(pTopic->logicalPlan);
159 160 161
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
162
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
163

L
Liu Jicong 已提交
164 165 166 167 168 169 170 171 172 173 174
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
  void *buf = taosMemoryMalloc(len);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
  SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
  if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
    goto TOPIC_DECODE_OVER;
  }

L
Liu Jicong 已提交
175
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
176

L
Liu Jicong 已提交
177
  terrno = TSDB_CODE_SUCCESS;
178 179

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
180
  if (terrno != TSDB_CODE_SUCCESS) {
181
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
182
    taosMemoryFreeClear(pRow);
183 184
    return NULL;
  }
L
Liu Jicong 已提交
185

186
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
187 188 189
  return pRow;
}

L
Liu Jicong 已提交
190
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
191 192 193 194
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
195
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
196 197 198 199
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
200
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
201
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
202
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
203 204 205
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
206 207

  // TODO handle update
L
Liu Jicong 已提交
208 209 210 211 212

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
213 214 215
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
216
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
217 218 219 220 221
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
222
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
223 224 225 226 227 228
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
229
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
232 233 234 235 236
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}

S
Shengliang Guan 已提交
237 238
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
239

wafwerar's avatar
wafwerar 已提交
240
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
241 242 243 244 245 246 247
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
248
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
249 250 251 252 253
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
254
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
S
Shengliang Guan 已提交
255 256 257 258
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
    return -1;
  }
L
Liu Jicong 已提交
259 260 261
  return 0;
}

L
Liu Jicong 已提交
262
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
263 264 265 266
  if (NULL == pCreate->ast) {
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
267
  SNode  *pAst = NULL;
268 269
  int32_t code = nodesStringToNode(pCreate->ast, &pAst);

L
Liu Jicong 已提交
270
  SQueryPlan *pPlan = NULL;
271
  if (TSDB_CODE_SUCCESS == code) {
L
Liu Jicong 已提交
272
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
273 274 275 276 277 278 279 280 281 282 283 284
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesNodeToString(pPlan, false, pStr, NULL);
  }
  nodesDestroyNode(pAst);
  nodesDestroyNode(pPlan);
  terrno = code;
  return code;
}

S
Shengliang Guan 已提交
285
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
286
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
287
  SMqTopicObj topicObj = {0};
288
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
289
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
290 291
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
292
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
293 294
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
295
  topicObj.sql = pCreate->sql;
L
Liu Jicong 已提交
296
  topicObj.physicalPlan = NULL;
297
  topicObj.logicalPlan = "";
298
  topicObj.sqlLen = strlen(pCreate->sql);
L
Liu Jicong 已提交
299

L
Liu Jicong 已提交
300
  char *pPlanStr = NULL;
301
  if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) {
302 303 304
    mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
305
  topicObj.physicalPlan = pPlanStr;
306

L
Liu Jicong 已提交
307 308 309 310 311 312 313 314
  SNode *pAst = NULL;
  if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
    return -1;
  }
  if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
315
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
316 317
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
wafwerar's avatar
wafwerar 已提交
318
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
319 320 321 322 323 324 325
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
326
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
327 328 329 330 331 332 333
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
334
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
335 336 337 338
    mndTransDrop(pTrans);
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
339
  taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
340 341
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
342 343
}

S
Shengliang Guan 已提交
344 345
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
  SMnode           *pMnode = pReq->pNode;
L
Liu Jicong 已提交
346 347 348 349 350 351 352
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

  if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
353 354 355
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
356 357

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
358

S
Shengliang Guan 已提交
359
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
360
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
361
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
362 363
  }

S
Shengliang Guan 已提交
364
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
365
  if (pTopic != NULL) {
366 367
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
368 369
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
370 371
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
372
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
373
    }
S
Shengliang Guan 已提交
374 375
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
376 377
  }

S
Shengliang Guan 已提交
378
  pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
379 380
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
398
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
399 400
  }

S
Shengliang Guan 已提交
401
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
402
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
403
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
404

L
Liu Jicong 已提交
405
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
406 407 408
  return code;
}

S
Shengliang Guan 已提交
409
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
410
  // TODO: cannot drop when subscribed
S
Shengliang Guan 已提交
411
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
429 430 431
    return -1;
  }

S
Shengliang Guan 已提交
432
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
433
  return 0;
L
Liu Jicong 已提交
434 435
}

S
Shengliang Guan 已提交
436 437
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
438
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
439

S
Shengliang Guan 已提交
440 441 442 443 444 445
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
446

S
Shengliang Guan 已提交
447
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
448
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
449 450
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
451 452 453
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
454
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
455 456 457 458
      return -1;
    }
  }

S
Shengliang Guan 已提交
459
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
460 461 462 463
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
464
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
465 466 467 468 469 470
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
471
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
472
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
473 474 475
  return 0;
}

L
Liu Jicong 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
503 504
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
L
Liu Jicong 已提交
505 506 507 508 509 510 511
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
S
Shengliang Guan 已提交
512
  SSchema *pSchema = pMeta->pSchemas;
L
Liu Jicong 已提交
513 514 515 516

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
S
Shengliang Guan 已提交
517
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
518 519 520 521 522
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
523
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
524 525
  cols++;

S
Shengliang Guan 已提交
526 527 528 529 530 531
  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
  pSchema[cols].bytes = pShow->bytes[cols];
  cols++;

S
Shengliang Guan 已提交
532
  pMeta->numOfColumns = cols;
L
Liu Jicong 已提交
533 534 535 536 537 538 539 540 541
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
542
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
L
Liu Jicong 已提交
543 544 545 546

  return 0;
}

S
Shengliang Guan 已提交
547 548
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode      *pMnode = pReq->pNode;
L
Liu Jicong 已提交
549 550 551 552 553
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
  int32_t      cols = 0;
  char        *pWrite;
S
Shengliang Guan 已提交
554 555 556 557
  char         prefix[TSDB_DB_FNAME_LEN] = {0};

  SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
  if (pDb == NULL) return 0;
L
Liu Jicong 已提交
558

S
Shengliang Guan 已提交
559
  tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
560 561 562 563 564 565 566
  strcat(prefix, TS_PATH_DELIMITER);
  int32_t prefixLen = (int32_t)strlen(prefix);

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
567 568
    if (pTopic->dbUid != pDb->uid) {
      if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
L
Liu Jicong 已提交
569
        mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
S
Shengliang Guan 已提交
570 571
      }

L
Liu Jicong 已提交
572 573 574 575 576 577
      sdbRelease(pSdb, pTopic);
      continue;
    }

    cols = 0;

L
Liu Jicong 已提交
578 579
    char topicName[TSDB_TOPIC_NAME_LEN] = {0};
    tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
L
Liu Jicong 已提交
580 581 582 583 584 585 586 587
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, topicName);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pTopic->createTime;
    cols++;

S
Shengliang Guan 已提交
588 589 590 591
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
    cols++;

L
Liu Jicong 已提交
592 593 594 595
    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

S
Shengliang Guan 已提交
596
  mndReleaseDb(pMnode, pDb);
L
Liu Jicong 已提交
597 598 599 600 601 602 603 604 605
  pShow->numOfReads += numOfRows;
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  return numOfRows;
}

static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}