mndTopic.c 28.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
L
Liu Jicong 已提交
17
#include "mndConsumer.h"
L
Liu Jicong 已提交
18 19 20
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
21
#include "mndOffset.h"
22
#include "mndPrivilege.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
L
Liu Jicong 已提交
40

S
Shengliang Guan 已提交
41
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44 45
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
46
int32_t mndInitTopic(SMnode *pMnode) {
47 48 49 50 51 52 53 54 55
  SSdbTable table = {
      .sdbType = SDB_TOPIC,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
  };
L
Liu Jicong 已提交
56

S
Shengliang Guan 已提交
57 58
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
59
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
60 61
  mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
65

L
Liu Jicong 已提交
66 67 68 69 70
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
71 72 73 74 75
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
76
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
77 78
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
79 80 81 82 83 84 85
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
86
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
87
  }
L
Liu Jicong 已提交
88 89 90 91
  int32_t ntbColLen = taosArrayGetSize(pTopic->ntbColIds) * sizeof(int16_t);

  int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
                 MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
92
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
93
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
94 95

  int32_t dataPos = 0;
L
Liu Jicong 已提交
96
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
97 98 99 100 101 102
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
103
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
104
  SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
105

L
Liu Jicong 已提交
106
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
107 108
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
109
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
110 111 112
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
113
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
114 115
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
116
  }
L
Liu Jicong 已提交
117
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
127 128 129 130 131 132 133 134 135
  SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
  if (pTopic->ntbUid != 0) {
    int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
    SDB_SET_INT32(pRaw, dataPos, sz, TOPIC_ENCODE_OVER);
    for (int32_t i = 0; i < sz; i++) {
      int16_t colId = *(int16_t *)taosArrayGet(pTopic->ntbColIds, i);
      SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
    }
  }
L
Liu Jicong 已提交
136
  SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138 139 140 141 142 143
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
144
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
145 146 147 148 149 150 151
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
152 153 154
  return pRaw;
}

L
Liu Jicong 已提交
155
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
156
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
157
  int8_t sver = 0;
158
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
159

S
Shengliang Guan 已提交
160
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
161
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
162
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
163 164
  }

L
Liu Jicong 已提交
165 166
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
167 168
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
169
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
170
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
171

L
Liu Jicong 已提交
172
  int32_t len;
L
Liu Jicong 已提交
173
  int32_t dataPos = 0;
L
Liu Jicong 已提交
174
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
175 176 177 178 179 180
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
181
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
182
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
185 186 187 188 189 190
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
191
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
192

L
Liu Jicong 已提交
193
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
202
  }
L
Liu Jicong 已提交
203
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
204
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
205 206 207 208 209 210 211 212 213
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
214
  }
215

L
Liu Jicong 已提交
216
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
217 218 219 220 221 222 223 224 225 226 227 228
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
229
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
230
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
231
  }
L
Liu Jicong 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244
  SDB_GET_INT64(pRaw, dataPos, &pTopic->ntbUid, TOPIC_DECODE_OVER);
  if (pTopic->ntbUid != 0) {
    int32_t ntbColNum;
    SDB_GET_INT32(pRaw, dataPos, &ntbColNum, TOPIC_DECODE_OVER);
    pTopic->ntbColIds = taosArrayInit(ntbColNum, sizeof(int16_t));
    if (pTopic->ntbColIds == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    int16_t colId;
    SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
    taosArrayPush(pTopic->ntbColIds, &colId);
  }
L
Liu Jicong 已提交
245
  SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
246

L
Liu Jicong 已提交
247
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
248

L
Liu Jicong 已提交
249
  terrno = TSDB_CODE_SUCCESS;
250 251

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
252
  if (terrno != TSDB_CODE_SUCCESS) {
253
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
254
    taosMemoryFreeClear(pRow);
255 256
    return NULL;
  }
L
Liu Jicong 已提交
257

258
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
259 260 261
  return pRow;
}

L
Liu Jicong 已提交
262
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
263 264 265 266
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
267
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
268 269 270 271
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
272
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
273
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
274
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
275 276
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
277
  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
278 279

  // TODO handle update
L
Liu Jicong 已提交
280

L
Liu Jicong 已提交
281
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
282 283 284
  return 0;
}

L
Liu Jicong 已提交
285 286 287
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
288
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
289 290 291 292 293
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
294
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
295 296 297 298
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

S
Shengliang Guan 已提交
299 300
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
301

wafwerar's avatar
wafwerar 已提交
302
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
303 304 305 306 307 308 309
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
310
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
311 312 313 314 315
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
316
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
317 318 319 320 321 322 323 324 325 326
  terrno = TSDB_CODE_MND_INVALID_TOPIC;

  if (pCreate->sql == NULL) return -1;

  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
    if (pCreate->subStbName[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
    if (pCreate->subDbName[0] == 0) return -1;
S
Shengliang Guan 已提交
327
  }
328

L
Liu Jicong 已提交
329
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
330
  return 0;
331 332
}

L
Liu Jicong 已提交
333
static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
334 335
  SNodeList *pNodeList = NULL;
  nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
L
Liu Jicong 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348
  int64_t suid = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->suid;
  int8_t  tableType = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->tableType;
  if (tableType == TSDB_CHILD_TABLE) {
    pTopic->ctbStbUid = suid;
  } else if (tableType == TSDB_NORMAL_TABLE) {
    SNode *pNode = NULL;
    FOREACH(pNode, pNodeList) {
      SColumnNode *pCol = (SColumnNode *)pNode;
      if (pCol->tableType == TSDB_NORMAL_TABLE) {
        pTopic->ntbUid = pCol->tableId;
        taosArrayPush(pTopic->ntbColIds, &pCol->colId);
      }
    }
L
Liu Jicong 已提交
349 350 351 352
  }
  return 0;
}

S
Shengliang Guan 已提交
353
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
354
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
355
  SMqTopicObj topicObj = {0};
356
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
357
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
358 359
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
360
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
361 362
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
363 364
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
365
  topicObj.subType = pCreate->subType;
L
Liu Jicong 已提交
366 367 368 369
  topicObj.withMeta = pCreate->withMeta;
  if (topicObj.withMeta) {
    ASSERT(topicObj.subType != TOPIC_SUB_TYPE__COLUMN);
  }
L
Liu Jicong 已提交
370

L
Liu Jicong 已提交
371
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
372 373
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
374

L
Liu Jicong 已提交
375 376
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
377 378
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
379 380 381
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
382

L
Liu Jicong 已提交
383
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
384

L
Liu Jicong 已提交
385 386 387
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
388 389
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
390 391 392
      return -1;
    }

L
Liu Jicong 已提交
393
    int64_t ntbUid;
L
Liu Jicong 已提交
394 395
    topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
    if (topicObj.ntbColIds == NULL) {
L
Liu Jicong 已提交
396 397 398
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
399 400 401 402 403
    extractTopicTbInfo(pAst, &topicObj);

    if (topicObj.ntbUid == 0) {
      taosArrayDestroy(topicObj.ntbColIds);
      topicObj.ntbColIds = NULL;
L
Liu Jicong 已提交
404 405
    }

L
Liu Jicong 已提交
406 407
    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
408 409
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
410 411 412
      return -1;
    }

413
    if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
L
Liu Jicong 已提交
414
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
415 416
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
417 418
      return -1;
    }
L
Liu Jicong 已提交
419
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
420
    SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
421 422 423 424
    if (pStb == NULL) {
      terrno = TSDB_CODE_MND_STB_NOT_EXIST;
      return -1;
    }
L
Liu Jicong 已提交
425
    topicObj.stbUid = pStb->uid;
L
Liu Jicong 已提交
426
  }
L
Liu Jicong 已提交
427 428 429 430 431 432
  /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
  /*topicObj.ast = NULL;*/
  /*topicObj.astLen = 0;*/
  /*topicObj.physicalPlan = NULL;*/
  /*topicObj.withTbName = 1;*/
  /*topicObj.withSchema = 1;*/
L
Liu Jicong 已提交
433

434
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
435 436
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
437 438
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
439
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
440 441 442 443
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

444 445 446
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
447
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
448 449 450
    mndTransDrop(pTrans);
    return -1;
  }
451
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
452

L
Liu Jicong 已提交
453
  if (topicObj.ntbUid != 0) {
454
    STqCheckInfo info;
L
Liu Jicong 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
    info.ntbUid = topicObj.ntbUid;
    info.colIdList = topicObj.ntbColIds;
    // broadcast forbid alter info
    void   *pIter = NULL;
    SSdb   *pSdb = pMnode->pSdb;
    SVgObj *pVgroup = NULL;
    while (1) {
      // iterate vg
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
      if (pIter == NULL) break;
      if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
        sdbRelease(pSdb, pVgroup);
        continue;
      }

      // encoder check alter info
      int32_t len;
      int32_t code;
474
      tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
L
Liu Jicong 已提交
475 476 477
      if (code < 0) {
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
L
Liu Jicong 已提交
478
        ASSERT(0);
L
Liu Jicong 已提交
479 480 481 482 483 484
        return -1;
      }
      void    *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
      void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
      SEncoder encoder;
      tEncoderInit(&encoder, abuf, len);
485
      if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
L
Liu Jicong 已提交
486 487 488 489 490
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
        return -1;
      }
      tEncoderClear(&encoder);
L
Liu Jicong 已提交
491
      ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
492 493 494 495 496
      // add redo action
      STransAction action = {0};
      action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
      action.pCont = buf;
      action.contLen = sizeof(SMsgHead) + len;
497
      action.msgType = TDMT_VND_ADD_CHECK_INFO;
L
Liu Jicong 已提交
498 499 500 501 502 503 504 505 506
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(buf);
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
        return -1;
      }
    }
  }

S
Shengliang Guan 已提交
507 508
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
509
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
510 511 512 513
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
514
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
515
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
516
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
517 518
}

S
Shengliang Guan 已提交
519 520
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
521 522 523 524 525
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
526
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
527
    terrno = TSDB_CODE_INVALID_MSG;
528
    goto _OVER;
S
Shengliang Guan 已提交
529
  }
530 531

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
532

S
Shengliang Guan 已提交
533
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
534
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
535
    goto _OVER;
L
Liu Jicong 已提交
536 537
  }

S
Shengliang Guan 已提交
538
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
539
  if (pTopic != NULL) {
540 541
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
542
      code = 0;
543
      goto _OVER;
L
Liu Jicong 已提交
544 545
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
546
      goto _OVER;
L
Liu Jicong 已提交
547
    }
S
Shengliang Guan 已提交
548
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
549
    goto _OVER;
L
Liu Jicong 已提交
550 551
  }

L
Liu Jicong 已提交
552
  pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
L
Liu Jicong 已提交
553 554
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
555
    goto _OVER;
S
Shengliang Guan 已提交
556 557
  }

558 559 560 561 562
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb) != 0) {
    goto _OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
563
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
564

565
_OVER:
S
Shengliang Guan 已提交
566
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
567
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
568 569
  }

S
Shengliang Guan 已提交
570
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
571 572
  mndReleaseDb(pMnode, pDb);

L
Liu Jicong 已提交
573
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
574 575 576
  return code;
}

577
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
578 579 580
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
581 582 583
    mndTransDrop(pTrans);
    return -1;
  }
584
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
585 586 587 588

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
589 590 591
    return -1;
  }

S
Shengliang Guan 已提交
592
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
593
  return 0;
L
Liu Jicong 已提交
594 595
}

S
Shengliang Guan 已提交
596
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
597 598
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
599
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
600

S
Shengliang Guan 已提交
601
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
602
    terrno = TSDB_CODE_INVALID_MSG;
603
    return -1;
L
Liu Jicong 已提交
604 605
  }

606
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
607 608 609 610 611 612 613 614 615 616
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
617

L
Liu Jicong 已提交
618 619 620 621 622 623 624 625 626 627 628 629 630 631
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
S
Shengliang Guan 已提交
632
        mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name,
L
Liu Jicong 已提交
633 634 635 636 637 638 639 640
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
641
  if (pTopic->refConsumerCnt != 0) {
642
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
643 644
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
645 646
    return -1;
  }
L
Liu Jicong 已提交
647
#endif
S
Shengliang Guan 已提交
648

649 650 651 652
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
    return -1;
  }

L
Liu Jicong 已提交
653
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
654
  mndTransSetDbName(pTrans, pTopic->db, NULL);
L
Liu Jicong 已提交
655 656
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
657
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
658 659
    return -1;
  }
L
Liu Jicong 已提交
660

L
Liu Jicong 已提交
661
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
662

663
#if 0
L
Liu Jicong 已提交
664 665
  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
666 667
    mndTransDrop(pTrans);
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
668
    return -1;
L
Liu Jicong 已提交
669
  }
670
#endif
L
Liu Jicong 已提交
671

672
  // TODO check if rebalancing
L
Liu Jicong 已提交
673
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
674 675 676 677
    /*ASSERT(0);*/
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    mndTransDrop(pTrans);
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
678 679 680
    return -1;
  }

681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  if (pTopic->ntbUid != 0) {
    // broadcast to all vnode
    void   *pIter = NULL;
    SVgObj *pVgroup = NULL;
    while (1) {
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
      if (pIter == NULL) break;
      if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
        sdbRelease(pSdb, pVgroup);
        continue;
      }

      void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
      void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
      ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
      memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);

      STransAction action = {0};
      action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
      action.pCont = buf;
      action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
      action.msgType = TDMT_VND_DELETE_CHECK_INFO;
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(buf);
        sdbRelease(pSdb, pVgroup);
        mndTransDrop(pTrans);
        return -1;
      }
    }
  }

L
Liu Jicong 已提交
712
  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
713 714 715 716
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
717
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
718 719 720
    return -1;
  }

S
Shengliang Guan 已提交
721
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
722 723
}

L
Liu Jicong 已提交
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
751 752
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
753 754 755
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
756

L
Liu Jicong 已提交
757
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
758 759 760
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
761 762 763
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
764

L
Liu Jicong 已提交
765
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
766
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
767 768
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
769
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
770
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
771

L
Liu Jicong 已提交
772 773 774 775
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
776
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
777
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
778

L
Liu Jicong 已提交
779
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
780 781
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
782 783
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
784
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
785
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
786
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
787 788 789 790 791

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

792
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
793 794 795
  return numOfRows;
}

796 797 798 799 800
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
801 802 803 804

  return 0;
}

L
Liu Jicong 已提交
805 806 807 808 809 810 811 812 813
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
814 815 816 817
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
818

819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
  SSdb *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      sdbRelease(pSdb, pTopic);
      terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
      return -1;
    }

    sdbRelease(pSdb, pTopic);
  }
  return 0;
}

L
Liu Jicong 已提交
839
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
840
  int32_t code = 0;
L
Liu Jicong 已提交
841 842 843 844 845 846 847 848 849 850 851 852 853 854
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
855 856 857 858
      sdbRelease(pSdb, pTopic);
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
859
    }
860 861

    sdbRelease(pSdb, pTopic);
L
Liu Jicong 已提交
862 863 864 865
  }

  return code;
}