mndTopic.c 27.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
L
Liu Jicong 已提交
17
#include "mndConsumer.h"
L
Liu Jicong 已提交
18 19 20
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
21
#include "mndOffset.h"
22
#include "mndPrivilege.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
L
Liu Jicong 已提交
40

S
Shengliang Guan 已提交
41
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44 45
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
46
int32_t mndInitTopic(SMnode *pMnode) {
47 48 49 50 51 52 53 54 55
  SSdbTable table = {
      .sdbType = SDB_TOPIC,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
  };
L
Liu Jicong 已提交
56

S
Shengliang Guan 已提交
57 58
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
59
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
60
  mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
63
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
64

L
Liu Jicong 已提交
65 66 67 68 69
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
70 71 72 73 74
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
75
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
76 77
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
78 79 80 81 82 83 84
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
85
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
86
  }
L
Liu Jicong 已提交
87 88 89 90
  int32_t ntbColLen = taosArrayGetSize(pTopic->ntbColIds) * sizeof(int16_t);

  int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
                 MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
91
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
92
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
93 94

  int32_t dataPos = 0;
L
Liu Jicong 已提交
95
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
96 97 98 99 100 101
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
102
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
103
  SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
104

L
Liu Jicong 已提交
105
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
106 107
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
108
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
109 110 111
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
112
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
113 114
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
115
  }
L
Liu Jicong 已提交
116
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
117 118 119 120 121 122 123 124 125
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
126 127 128 129 130 131 132 133 134
  SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
  if (pTopic->ntbUid != 0) {
    int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
    SDB_SET_INT32(pRaw, dataPos, sz, TOPIC_ENCODE_OVER);
    for (int32_t i = 0; i < sz; i++) {
      int16_t colId = *(int16_t *)taosArrayGet(pTopic->ntbColIds, i);
      SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
    }
  }
L
Liu Jicong 已提交
135
  SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137 138 139 140 141 142
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
143
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
144 145 146 147 148 149 150
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
151 152 153
  return pRaw;
}

L
Liu Jicong 已提交
154
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
155
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
156
  int8_t sver = 0;
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
158

S
Shengliang Guan 已提交
159
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
160
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
161
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
162 163
  }

L
Liu Jicong 已提交
164 165
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
166 167
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
168
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
169
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171
  int32_t len;
L
Liu Jicong 已提交
172
  int32_t dataPos = 0;
L
Liu Jicong 已提交
173
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
174 175 176 177 178 179
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
180
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
181
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
182

L
Liu Jicong 已提交
183
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
184 185 186 187 188 189
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
190
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
191

L
Liu Jicong 已提交
192
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
193 194 195 196 197 198 199 200
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
201
  }
L
Liu Jicong 已提交
202
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
203
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
213
  }
214

L
Liu Jicong 已提交
215
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
216 217 218 219 220 221 222 223 224 225 226 227
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
228
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
229
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
230
  }
L
Liu Jicong 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
  SDB_GET_INT64(pRaw, dataPos, &pTopic->ntbUid, TOPIC_DECODE_OVER);
  if (pTopic->ntbUid != 0) {
    int32_t ntbColNum;
    SDB_GET_INT32(pRaw, dataPos, &ntbColNum, TOPIC_DECODE_OVER);
    pTopic->ntbColIds = taosArrayInit(ntbColNum, sizeof(int16_t));
    if (pTopic->ntbColIds == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    int16_t colId;
    SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
    taosArrayPush(pTopic->ntbColIds, &colId);
  }
L
Liu Jicong 已提交
244
  SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
245

L
Liu Jicong 已提交
246
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
247

L
Liu Jicong 已提交
248
  terrno = TSDB_CODE_SUCCESS;
249 250

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
251
  if (terrno != TSDB_CODE_SUCCESS) {
252
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
253
    taosMemoryFreeClear(pRow);
254 255
    return NULL;
  }
L
Liu Jicong 已提交
256

257
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
258 259 260
  return pRow;
}

L
Liu Jicong 已提交
261
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
262 263 264 265
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
266
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
267 268 269 270
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
271
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
272
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
273
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
274 275
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
276
  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
277 278

  // TODO handle update
L
Liu Jicong 已提交
279

L
Liu Jicong 已提交
280
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
281 282 283
  return 0;
}

L
Liu Jicong 已提交
284 285 286
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
287
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
288 289 290 291 292
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
293
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
294 295 296 297
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

S
Shengliang Guan 已提交
298 299
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
300

wafwerar's avatar
wafwerar 已提交
301
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
302 303 304 305 306 307 308
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
309
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
310 311 312 313 314
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
315
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
316 317 318 319 320 321 322 323 324 325
  terrno = TSDB_CODE_MND_INVALID_TOPIC;

  if (pCreate->sql == NULL) return -1;

  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
    if (pCreate->subStbName[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
    if (pCreate->subDbName[0] == 0) return -1;
S
Shengliang Guan 已提交
326
  }
327

L
Liu Jicong 已提交
328
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
329
  return 0;
330 331
}

L
Liu Jicong 已提交
332
static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
333 334
  SNodeList *pNodeList = NULL;
  nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
L
Liu Jicong 已提交
335 336 337 338 339 340 341 342 343 344 345 346 347
  int64_t suid = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->suid;
  int8_t  tableType = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->tableType;
  if (tableType == TSDB_CHILD_TABLE) {
    pTopic->ctbStbUid = suid;
  } else if (tableType == TSDB_NORMAL_TABLE) {
    SNode *pNode = NULL;
    FOREACH(pNode, pNodeList) {
      SColumnNode *pCol = (SColumnNode *)pNode;
      if (pCol->tableType == TSDB_NORMAL_TABLE) {
        pTopic->ntbUid = pCol->tableId;
        taosArrayPush(pTopic->ntbColIds, &pCol->colId);
      }
    }
L
Liu Jicong 已提交
348 349 350 351
  }
  return 0;
}

S
Shengliang Guan 已提交
352
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
353
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
354
  SMqTopicObj topicObj = {0};
355
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
356
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
357 358
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
359
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
360 361
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
362 363
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
364
  topicObj.subType = pCreate->subType;
L
Liu Jicong 已提交
365 366 367 368
  topicObj.withMeta = pCreate->withMeta;
  if (topicObj.withMeta) {
    ASSERT(topicObj.subType != TOPIC_SUB_TYPE__COLUMN);
  }
L
Liu Jicong 已提交
369

L
Liu Jicong 已提交
370
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
371 372
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
373

L
Liu Jicong 已提交
374 375
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
376 377
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
378 379 380
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
381

L
Liu Jicong 已提交
382
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
383

L
Liu Jicong 已提交
384 385 386
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
387 388
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
389 390 391
      return -1;
    }

L
Liu Jicong 已提交
392
    int64_t ntbUid;
L
Liu Jicong 已提交
393 394
    topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
    if (topicObj.ntbColIds == NULL) {
L
Liu Jicong 已提交
395 396 397
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
398 399 400 401 402
    extractTopicTbInfo(pAst, &topicObj);

    if (topicObj.ntbUid == 0) {
      taosArrayDestroy(topicObj.ntbColIds);
      topicObj.ntbColIds = NULL;
L
Liu Jicong 已提交
403 404
    }

L
Liu Jicong 已提交
405 406
    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
407 408
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
409 410 411
      return -1;
    }

412
    if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
L
Liu Jicong 已提交
413
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
414 415
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
416 417
      return -1;
    }
L
Liu Jicong 已提交
418
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
419
    SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
420 421 422 423
    if (pStb == NULL) {
      terrno = TSDB_CODE_MND_STB_NOT_EXIST;
      return -1;
    }
L
Liu Jicong 已提交
424
    topicObj.stbUid = pStb->uid;
L
Liu Jicong 已提交
425
  }
L
Liu Jicong 已提交
426 427 428 429 430 431
  /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
  /*topicObj.ast = NULL;*/
  /*topicObj.astLen = 0;*/
  /*topicObj.physicalPlan = NULL;*/
  /*topicObj.withTbName = 1;*/
  /*topicObj.withSchema = 1;*/
L
Liu Jicong 已提交
432

433
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
434 435
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
436 437
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
438
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
439 440 441 442
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

443 444 445
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
446
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
447 448 449
    mndTransDrop(pTrans);
    return -1;
  }
450
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
451

L
Liu Jicong 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
  if (topicObj.ntbUid != 0) {
    SCheckAlterInfo info;
    memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
    info.ntbUid = topicObj.ntbUid;
    info.colIdList = topicObj.ntbColIds;
    // broadcast forbid alter info
    void   *pIter = NULL;
    SSdb   *pSdb = pMnode->pSdb;
    SVgObj *pVgroup = NULL;
    while (1) {
      // iterate vg
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
      if (pIter == NULL) break;
      if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
        sdbRelease(pSdb, pVgroup);
        continue;
      }

      // encoder check alter info
      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
      if (code < 0) {
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
L
Liu Jicong 已提交
477
        ASSERT(0);
L
Liu Jicong 已提交
478 479 480 481 482 483 484 485 486 487 488 489
        return -1;
      }
      void    *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
      void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
      SEncoder encoder;
      tEncoderInit(&encoder, abuf, len);
      if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
        return -1;
      }
      tEncoderClear(&encoder);
L
Liu Jicong 已提交
490
      ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
      // add redo action
      STransAction action = {0};
      action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
      action.pCont = buf;
      action.contLen = sizeof(SMsgHead) + len;
      action.msgType = TDMT_VND_CHECK_ALTER_INFO;
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(buf);
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
        return -1;
      }
    }
  }

S
Shengliang Guan 已提交
506 507
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
508
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
509 510 511 512
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
513
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
514
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
515
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
516 517
}

S
Shengliang Guan 已提交
518 519
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
520 521 522 523 524
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
525
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
526
    terrno = TSDB_CODE_INVALID_MSG;
527
    goto _OVER;
S
Shengliang Guan 已提交
528
  }
529 530

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
531

S
Shengliang Guan 已提交
532
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
533
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
534
    goto _OVER;
L
Liu Jicong 已提交
535 536
  }

S
Shengliang Guan 已提交
537
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
538
  if (pTopic != NULL) {
539 540
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
541
      code = 0;
542
      goto _OVER;
L
Liu Jicong 已提交
543 544
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
545
      goto _OVER;
L
Liu Jicong 已提交
546
    }
S
Shengliang Guan 已提交
547
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
548
    goto _OVER;
L
Liu Jicong 已提交
549 550
  }

L
Liu Jicong 已提交
551
  pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
L
Liu Jicong 已提交
552 553
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
554
    goto _OVER;
S
Shengliang Guan 已提交
555 556
  }

557 558 559 560 561
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb) != 0) {
    goto _OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
562
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
563

564
_OVER:
S
Shengliang Guan 已提交
565
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
566
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
567 568
  }

S
Shengliang Guan 已提交
569
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
570 571
  mndReleaseDb(pMnode, pDb);

L
Liu Jicong 已提交
572
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
573 574 575
  return code;
}

576
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
577 578 579
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
580 581 582
    mndTransDrop(pTrans);
    return -1;
  }
583
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
584 585 586 587

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
588 589 590
    return -1;
  }

S
Shengliang Guan 已提交
591
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
592
  return 0;
L
Liu Jicong 已提交
593 594
}

S
Shengliang Guan 已提交
595
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
596 597
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
598
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
599

S
Shengliang Guan 已提交
600
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
601
    terrno = TSDB_CODE_INVALID_MSG;
602
    return -1;
L
Liu Jicong 已提交
603 604
  }

605
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
606 607 608 609 610 611 612 613 614 615
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
616

L
Liu Jicong 已提交
617 618 619 620 621 622 623 624 625 626 627 628 629 630
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
S
Shengliang Guan 已提交
631
        mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name,
L
Liu Jicong 已提交
632 633 634 635 636 637 638 639
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
640
  if (pTopic->refConsumerCnt != 0) {
641
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
642 643
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
644 645
    return -1;
  }
L
Liu Jicong 已提交
646
#endif
S
Shengliang Guan 已提交
647

648 649 650 651
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
    return -1;
  }

L
Liu Jicong 已提交
652
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
653
  mndTransSetDbName(pTrans, pTopic->db, NULL);
L
Liu Jicong 已提交
654 655
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
656
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
657 658
    return -1;
  }
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
661

L
Liu Jicong 已提交
662 663
  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
664 665
    mndTransDrop(pTrans);
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
666
    return -1;
L
Liu Jicong 已提交
667 668
  }

669
  // TODO check if rebalancing
L
Liu Jicong 已提交
670
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
671 672 673 674
    /*ASSERT(0);*/
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    mndTransDrop(pTrans);
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
675 676 677 678
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
679 680 681 682
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
683
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
684 685 686
    return -1;
  }

S
Shengliang Guan 已提交
687
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
688 689
}

L
Liu Jicong 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
717 718
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
719 720 721
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
722

L
Liu Jicong 已提交
723
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
724 725 726
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
727 728 729
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
730

L
Liu Jicong 已提交
731
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
732
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
733 734
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
735
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
736
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
737

L
Liu Jicong 已提交
738 739 740 741
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
742
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
743
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
744

L
Liu Jicong 已提交
745
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
746 747
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
748 749
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
750
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
751
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
752
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
753 754 755 756 757

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

758
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
759 760 761
  return numOfRows;
}

762 763 764 765 766
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
767 768 769 770

  return 0;
}

L
Liu Jicong 已提交
771 772 773 774 775 776 777 778 779
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
780 781 782 783
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
784

785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
  SSdb *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      sdbRelease(pSdb, pTopic);
      terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
      return -1;
    }

    sdbRelease(pSdb, pTopic);
  }
  return 0;
}

L
Liu Jicong 已提交
805
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
806
  int32_t code = 0;
L
Liu Jicong 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
821 822 823 824
      sdbRelease(pSdb, pTopic);
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
825
    }
826 827

    sdbRelease(pSdb, pTopic);
L
Liu Jicong 已提交
828 829 830 831
  }

  return code;
}