mndTopic.c 23.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
L
Liu Jicong 已提交
40

S
Shengliang Guan 已提交
41
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44 45
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
46
int32_t mndInitTopic(SMnode *pMnode) {
47 48 49 50 51 52 53 54 55
  SSdbTable table = {
      .sdbType = SDB_TOPIC,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
  };
L
Liu Jicong 已提交
56

S
Shengliang Guan 已提交
57 58
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
59
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
60

L
Liu Jicong 已提交
61
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
62
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
63

L
Liu Jicong 已提交
64 65 66 67 68
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
69 70 71 72 73
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

74
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  SSdb *pSdb = pMnode->pSdb;
  void *pIter = NULL;
  bool  found = false;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;
    if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    SNode *pAst = NULL;
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
      ASSERT(0);
L
Liu Jicong 已提交
90
      return -1;
91 92
    }

93
    SNodeList *pNodeList = NULL;
94 95 96 97 98
    nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
    SNode *pNode = NULL;
    FOREACH(pNode, pNodeList) {
      SColumnNode *pCol = (SColumnNode *)pNode;
      if (pCol->tableId != suid) goto NEXT;
L
Liu Jicong 已提交
99 100 101
      if (pCol->colId > 0 && pCol->colId == colId) {
        found = true;
        goto NEXT;
102
      }
103
      mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
104 105 106 107 108
    }

  NEXT:
    sdbRelease(pSdb, pTopic);
    nodesDestroyNode(pAst);
109 110 111 112
    if (found) {
      terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
      return -1;
    }
113
  }
114 115

  return 0;
116 117
}

L
Liu Jicong 已提交
118
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
119 120
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
121 122 123 124 125 126 127
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
128
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
129
  }
L
fix  
Liu Jicong 已提交
130
  int32_t size =
L
Liu Jicong 已提交
131
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
132
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
133
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
134 135

  int32_t dataPos = 0;
L
Liu Jicong 已提交
136
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
137 138 139 140 141 142
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
143
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
144

L
Liu Jicong 已提交
145
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
146 147
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
148
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
149 150 151
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
152
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
153 154
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
155
  }
L
Liu Jicong 已提交
156
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
157 158 159 160 161 162 163 164 165 166
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }

L
Liu Jicong 已提交
167 168 169 170 171 172
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
173
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
174 175 176 177 178 179 180
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
181 182 183
  return pRaw;
}

L
Liu Jicong 已提交
184
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
185
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
186
  int8_t sver = 0;
187
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
188

S
Shengliang Guan 已提交
189
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
190
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
191
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
192 193
  }

L
Liu Jicong 已提交
194 195
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
196 197
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
198
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
199
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
  int32_t len;
L
Liu Jicong 已提交
202
  int32_t dataPos = 0;
L
Liu Jicong 已提交
203
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
204 205 206 207 208 209
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
210
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
211

L
Liu Jicong 已提交
212
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
213 214 215 216 217 218
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
219
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
220

L
Liu Jicong 已提交
221
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
222 223 224 225 226 227 228 229
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
230
  }
L
Liu Jicong 已提交
231
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
232
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
233 234 235 236 237 238 239 240 241
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
242
  }
243

L
Liu Jicong 已提交
244
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
245 246 247 248 249 250 251 252 253 254 255 256
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
257
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
258
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
259 260
  }

L
Liu Jicong 已提交
261
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
262

L
Liu Jicong 已提交
263
  terrno = TSDB_CODE_SUCCESS;
264 265

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
266
  if (terrno != TSDB_CODE_SUCCESS) {
267
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
268
    taosMemoryFreeClear(pRow);
269 270
    return NULL;
  }
L
Liu Jicong 已提交
271

272
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
273 274 275
  return pRow;
}

L
Liu Jicong 已提交
276
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
277 278 279 280
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
281
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
282 283 284 285
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
286
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
287
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
288
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
289 290
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
291
  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
292 293

  // TODO handle update
L
Liu Jicong 已提交
294

L
Liu Jicong 已提交
295
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
296 297 298
  return 0;
}

L
Liu Jicong 已提交
299 300 301
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
302
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
303 304 305 306 307
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
308
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
309 310 311 312
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

S
Shengliang Guan 已提交
313 314
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
315

wafwerar's avatar
wafwerar 已提交
316
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
317 318 319 320 321 322 323
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
324
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
325 326 327 328 329
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
330
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
331 332 333 334 335 336 337 338 339 340
  terrno = TSDB_CODE_MND_INVALID_TOPIC;

  if (pCreate->sql == NULL) return -1;

  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
    if (pCreate->subStbName[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
    if (pCreate->subDbName[0] == 0) return -1;
S
Shengliang Guan 已提交
341
  }
342

L
Liu Jicong 已提交
343
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
344
  return 0;
345 346
}

S
Shengliang Guan 已提交
347
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
348
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
349
  SMqTopicObj topicObj = {0};
350
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
351
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
352 353
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
354
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
355 356
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
357 358
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
359
  topicObj.subType = pCreate->subType;
L
Liu Jicong 已提交
360

L
Liu Jicong 已提交
361
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
362 363
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
364

L
Liu Jicong 已提交
365 366
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
367 368
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
369 370 371
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
372

L
Liu Jicong 已提交
373
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
374

L
Liu Jicong 已提交
375 376 377
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
378 379
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
380 381 382 383 384
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
385 386
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
387 388 389 390 391
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
392 393
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
394 395
      return -1;
    }
L
Liu Jicong 已提交
396
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
397 398
    SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
    topicObj.stbUid = pStb->uid;
L
Liu Jicong 已提交
399
  }
L
Liu Jicong 已提交
400 401 402 403 404 405
  /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
  /*topicObj.ast = NULL;*/
  /*topicObj.astLen = 0;*/
  /*topicObj.physicalPlan = NULL;*/
  /*topicObj.withTbName = 1;*/
  /*topicObj.withSchema = 1;*/
L
Liu Jicong 已提交
406

407
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
408 409
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
410 411
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
412
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
413 414 415 416
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

417 418 419
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
420
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
421 422 423
    mndTransDrop(pTrans);
    return -1;
  }
424
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
425 426 427

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
428
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
429 430 431 432
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
433
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
434 435
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
436 437
}

S
Shengliang Guan 已提交
438 439
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
440 441 442 443 444 445
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
446
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
447 448 449
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
450 451

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
452

S
Shengliang Guan 已提交
453
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
454
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
455
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
456 457
  }

S
Shengliang Guan 已提交
458
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
459
  if (pTopic != NULL) {
460 461
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
462 463
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
464 465
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
466
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
467
    }
S
Shengliang Guan 已提交
468 469
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
470 471
  }

L
Liu Jicong 已提交
472
  pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
L
Liu Jicong 已提交
473 474
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
475 476 477
    goto CREATE_TOPIC_OVER;
  }

S
Shengliang Guan 已提交
478
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
479 480 481 482 483 484 485 486 487
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
488
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
489 490

CREATE_TOPIC_OVER:
S
Shengliang Guan 已提交
491
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
492
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
493 494
  }

S
Shengliang Guan 已提交
495
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
496
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
497
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
498

L
Liu Jicong 已提交
499
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
500 501 502
  return code;
}

503
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
504 505 506
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
507 508 509
    mndTransDrop(pTrans);
    return -1;
  }
510
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
511 512 513 514

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
515 516 517
    return -1;
  }

S
Shengliang Guan 已提交
518
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
519
  return 0;
L
Liu Jicong 已提交
520 521
}

S
Shengliang Guan 已提交
522
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
523 524
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
525
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
526

S
Shengliang Guan 已提交
527
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
528
    terrno = TSDB_CODE_INVALID_MSG;
529
    return -1;
L
Liu Jicong 已提交
530 531
  }

532
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
533 534 535 536 537 538 539 540 541 542
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
543

L
Liu Jicong 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556 557
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
L
Liu Jicong 已提交
558
        mError("topic:%s, failed to drop since subscribed by consumer %ld in consumer group %s", dropReq.name,
L
Liu Jicong 已提交
559 560 561 562 563 564 565 566
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
567
  if (pTopic->refConsumerCnt != 0) {
568
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
569 570
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
571 572
    return -1;
  }
L
Liu Jicong 已提交
573
#endif
S
Shengliang Guan 已提交
574

L
Liu Jicong 已提交
575 576
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
  mndTransSetDbName(pTrans, pTopic->db);
L
Liu Jicong 已提交
577 578 579 580
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
581

L
Liu Jicong 已提交
582 583 584 585 586
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
L
Liu Jicong 已提交
587 588
  }

L
Liu Jicong 已提交
589 590 591 592 593 594
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
595 596 597 598
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
599
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
600 601 602
    return -1;
  }

S
Shengliang Guan 已提交
603
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
604 605
}

L
Liu Jicong 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
633 634
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
635 636 637
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
638

L
Liu Jicong 已提交
639
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
640 641 642
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
643 644 645
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
646

L
Liu Jicong 已提交
647
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
648
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
649 650
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
651
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
652
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
653

L
Liu Jicong 已提交
654 655 656 657
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
658
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
659
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
660

L
Liu Jicong 已提交
661
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
662 663
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
664 665
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
666
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
667
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
668
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
669 670 671 672 673

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

674
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
675 676 677
  return numOfRows;
}

678 679 680 681 682
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
683 684 685 686

  return 0;
}

L
Liu Jicong 已提交
687 688 689 690 691 692 693 694 695
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
696 697 698 699
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}