mndTopic.c 19.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
L
Liu Jicong 已提交
23 24 25
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
26
#include "parser.h"
L
Liu Jicong 已提交
27 28
#include "tname.h"

L
Liu Jicong 已提交
29
#define MND_TOPIC_VER_NUMBER 1
S
Shengliang Guan 已提交
30
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37 38 39 40
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndProcessTopicMetaReq(SNodeMsg *pReq);
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42 43 44 45 46 47 48 49 50 51

int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
52 53
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
54
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
55

S
Shengliang Guan 已提交
56 57 58 59
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);

L
Liu Jicong 已提交
60 61 62 63 64
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
65
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
66 67
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
fix  
Liu Jicong 已提交
68 69 70 71 72
  int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
  int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
  int32_t size =
      sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + swLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
73
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
74
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
75 76

  int32_t dataPos = 0;
L
Liu Jicong 已提交
77
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
78 79 80 81 82 83 84 85
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
86
  SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
87
  SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
88
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
89
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
90

L
fix  
Liu Jicong 已提交
91
  void *swBuf = taosMemoryMalloc(swLen);
L
Liu Jicong 已提交
92 93 94 95 96 97 98 99
  if (swBuf == NULL) {
    goto TOPIC_ENCODE_OVER;
  }
  void *aswBuf = swBuf;
  taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
  SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER);

L
Liu Jicong 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
113 114 115
  return pRaw;
}

L
Liu Jicong 已提交
116
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
118
  int8_t sver = 0;
119
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
120

S
Shengliang Guan 已提交
121
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
122
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
123
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
124 125
  }

L
Liu Jicong 已提交
126 127
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
128 129
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
130
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
131
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133
  int32_t len;
L
Liu Jicong 已提交
134
  int32_t dataPos = 0;
L
Liu Jicong 已提交
135
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
136 137 138 139 140 141 142
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
143

wafwerar's avatar
wafwerar 已提交
144
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen + 1, sizeof(char));
L
Liu Jicong 已提交
145
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
146 147

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
148
  pTopic->logicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
149 150 151 152
  if (pTopic->logicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
153
  SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
154 155

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
156
  pTopic->physicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
157
  if (pTopic->physicalPlan == NULL) {
wafwerar's avatar
wafwerar 已提交
158
    taosMemoryFree(pTopic->logicalPlan);
159 160 161
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
162
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
163

L
Liu Jicong 已提交
164 165 166 167 168 169 170 171 172 173 174
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
  void *buf = taosMemoryMalloc(len);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
  SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
  if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
    goto TOPIC_DECODE_OVER;
  }

L
Liu Jicong 已提交
175
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
176

L
Liu Jicong 已提交
177
  terrno = TSDB_CODE_SUCCESS;
178 179

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
180
  if (terrno != TSDB_CODE_SUCCESS) {
181
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
182
    taosMemoryFreeClear(pRow);
183 184
    return NULL;
  }
L
Liu Jicong 已提交
185

186
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
187 188 189
  return pRow;
}

L
Liu Jicong 已提交
190
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
191 192 193 194
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
195
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
196 197 198 199
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
200
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
201
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
202
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
203 204 205
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
206 207

  // TODO handle update
L
Liu Jicong 已提交
208 209 210 211 212

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
213 214 215
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
216
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
217 218 219 220 221
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
222
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
223 224 225 226 227 228
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
229
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
232 233 234 235 236
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}

S
Shengliang Guan 已提交
237 238
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
239

wafwerar's avatar
wafwerar 已提交
240
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
241 242 243 244 245 246 247
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
248
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
249 250 251 252 253
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
254
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
S
Shengliang Guan 已提交
255 256 257 258
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
    return -1;
  }
L
Liu Jicong 已提交
259 260 261
  return 0;
}

L
Liu Jicong 已提交
262
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
263 264 265 266
  if (NULL == pCreate->ast) {
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
267
  SNode  *pAst = NULL;
268 269
  int32_t code = nodesStringToNode(pCreate->ast, &pAst);

L
Liu Jicong 已提交
270
  SQueryPlan *pPlan = NULL;
271
  if (TSDB_CODE_SUCCESS == code) {
L
Liu Jicong 已提交
272
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
273 274 275 276 277 278 279 280 281 282 283 284
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesNodeToString(pPlan, false, pStr, NULL);
  }
  nodesDestroyNode(pAst);
  nodesDestroyNode(pPlan);
  terrno = code;
  return code;
}

S
Shengliang Guan 已提交
285
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
286
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
287
  SMqTopicObj topicObj = {0};
288
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
289
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
290 291
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
292
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
293 294
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
295
  topicObj.sql = pCreate->sql;
L
Liu Jicong 已提交
296
  topicObj.physicalPlan = "";
297
  topicObj.logicalPlan = "";
298
  topicObj.sqlLen = strlen(pCreate->sql);
L
Liu Jicong 已提交
299

L
Liu Jicong 已提交
300
  char *pPlanStr = NULL;
301
  if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) {
302 303 304
    mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
305 306 307
  if (NULL != pPlanStr) {
    topicObj.physicalPlan = pPlanStr;
  }
308

L
Liu Jicong 已提交
309 310 311 312 313 314 315 316
  SNode *pAst = NULL;
  if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
    return -1;
  }
  if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
317
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
318 319
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
wafwerar's avatar
wafwerar 已提交
320
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
321 322 323 324 325 326 327
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
328
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
329 330 331 332 333 334 335
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
wafwerar's avatar
wafwerar 已提交
336
    taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
337 338 339 340
    mndTransDrop(pTrans);
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
341
  taosMemoryFreeClear(pPlanStr);
S
Shengliang Guan 已提交
342 343
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
344 345
}

S
Shengliang Guan 已提交
346 347
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
  SMnode           *pMnode = pReq->pNode;
L
Liu Jicong 已提交
348 349 350 351 352 353 354
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

  if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
355 356 357
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
358 359

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
360

S
Shengliang Guan 已提交
361
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
362
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
363
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
364 365
  }

S
Shengliang Guan 已提交
366
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
367
  if (pTopic != NULL) {
368 369
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
370 371
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
372 373
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
374
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
375
    }
S
Shengliang Guan 已提交
376 377
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
378 379
  }

S
Shengliang Guan 已提交
380
  pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
381 382
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
400
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
401 402
  }

S
Shengliang Guan 已提交
403
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
404
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
405
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
406

L
Liu Jicong 已提交
407
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
408 409 410
  return code;
}

S
Shengliang Guan 已提交
411
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
412
  // TODO: cannot drop when subscribed
S
Shengliang Guan 已提交
413
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
S
Shengliang Guan 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
431 432 433
    return -1;
  }

S
Shengliang Guan 已提交
434
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
435
  return 0;
L
Liu Jicong 已提交
436 437
}

S
Shengliang Guan 已提交
438 439
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
440
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
441

S
Shengliang Guan 已提交
442 443 444 445 446 447
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
448

S
Shengliang Guan 已提交
449
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
450
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
451 452
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
453 454 455
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
456
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
457 458 459 460
      return -1;
    }
  }

S
Shengliang Guan 已提交
461
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
462 463 464 465
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
466
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
467 468 469 470 471 472
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
473
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
474
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
475 476 477
  return 0;
}

L
Liu Jicong 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
505 506
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
L
Liu Jicong 已提交
507 508 509 510 511 512 513
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
S
Shengliang Guan 已提交
514
  SSchema *pSchema = pMeta->pSchemas;
L
Liu Jicong 已提交
515 516 517 518

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
S
Shengliang Guan 已提交
519
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
520 521 522 523 524
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
525
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
526 527
  cols++;

S
Shengliang Guan 已提交
528 529 530 531 532 533
  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
  pSchema[cols].bytes = pShow->bytes[cols];
  cols++;

S
Shengliang Guan 已提交
534
  pMeta->numOfColumns = cols;
L
Liu Jicong 已提交
535 536 537 538 539 540 541 542 543
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
544
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
L
Liu Jicong 已提交
545 546 547 548

  return 0;
}

S
Shengliang Guan 已提交
549 550
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode      *pMnode = pReq->pNode;
L
Liu Jicong 已提交
551 552 553 554 555
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
  int32_t      cols = 0;
  char        *pWrite;
S
Shengliang Guan 已提交
556 557 558 559
  char         prefix[TSDB_DB_FNAME_LEN] = {0};

  SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
  if (pDb == NULL) return 0;
L
Liu Jicong 已提交
560

S
Shengliang Guan 已提交
561
  tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
562 563 564 565 566 567 568
  strcat(prefix, TS_PATH_DELIMITER);
  int32_t prefixLen = (int32_t)strlen(prefix);

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
569 570
    if (pTopic->dbUid != pDb->uid) {
      if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
L
Liu Jicong 已提交
571
        mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
S
Shengliang Guan 已提交
572 573
      }

L
Liu Jicong 已提交
574 575 576 577 578 579
      sdbRelease(pSdb, pTopic);
      continue;
    }

    cols = 0;

L
Liu Jicong 已提交
580 581
    char topicName[TSDB_TOPIC_NAME_LEN] = {0};
    tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
L
Liu Jicong 已提交
582 583 584 585 586 587 588 589
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, topicName);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pTopic->createTime;
    cols++;

S
Shengliang Guan 已提交
590 591 592 593
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
    cols++;

L
Liu Jicong 已提交
594 595 596 597
    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

S
Shengliang Guan 已提交
598
  mndReleaseDb(pMnode, pDb);
L
Liu Jicong 已提交
599 600 601 602 603 604 605 606 607
  pShow->numOfReads += numOfRows;
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  return numOfRows;
}

static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}