mndTopic.c 22.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39 40
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
L
Liu Jicong 已提交
41

S
Shengliang Guan 已提交
42
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
43
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55
int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
56 57
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
58
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
61
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
62

L
Liu Jicong 已提交
63 64 65 66 67
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
68 69 70 71 72
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
73
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
74 75
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
76 77 78 79 80 81 82
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
83
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
84
  }
L
fix  
Liu Jicong 已提交
85
  int32_t size =
L
Liu Jicong 已提交
86
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
87
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
88
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
89 90

  int32_t dataPos = 0;
L
Liu Jicong 已提交
91
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
92 93 94 95 96 97
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
98
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
101 102
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
103
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
104 105 106
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
107
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
108 109
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
110
  }
L
Liu Jicong 已提交
111
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
112 113 114 115 116 117 118 119 120 121
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }

L
Liu Jicong 已提交
122 123 124 125 126 127
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
128
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
129 130 131 132 133 134 135
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
136 137 138
  return pRaw;
}

L
Liu Jicong 已提交
139
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
140
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
141
  int8_t sver = 0;
142
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
143

S
Shengliang Guan 已提交
144
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
145
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
146
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
147 148
  }

L
Liu Jicong 已提交
149 150
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
151 152
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
153
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
154
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
155

L
Liu Jicong 已提交
156
  int32_t len;
L
Liu Jicong 已提交
157
  int32_t dataPos = 0;
L
Liu Jicong 已提交
158
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
159 160 161 162 163 164
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
165
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
168 169 170 171 172 173
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
174
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
175

L
Liu Jicong 已提交
176
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
177 178 179 180 181 182 183 184
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
185
  }
L
Liu Jicong 已提交
186
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
187
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
188 189 190 191 192 193 194 195 196
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
197
  }
198

L
Liu Jicong 已提交
199
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
200 201 202 203 204 205 206 207 208 209 210 211
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
212
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
213
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
214 215
  }

L
Liu Jicong 已提交
216
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
217

L
Liu Jicong 已提交
218
  terrno = TSDB_CODE_SUCCESS;
219 220

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
221
  if (terrno != TSDB_CODE_SUCCESS) {
222
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
223
    taosMemoryFreeClear(pRow);
224 225
    return NULL;
  }
L
Liu Jicong 已提交
226

227
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
228 229 230
  return pRow;
}

L
Liu Jicong 已提交
231
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
232 233 234 235
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
236
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
237 238 239 240
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
241
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
242
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
243
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
244 245
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
246
  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
247 248

  // TODO handle update
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
251 252 253
  return 0;
}

L
Liu Jicong 已提交
254 255 256
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
257
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
258 259 260 261 262
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
263
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
264 265 266 267
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

S
Shengliang Guan 已提交
268 269
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
270

wafwerar's avatar
wafwerar 已提交
271
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
272 273 274 275 276 277 278
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
279
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
280 281 282 283 284
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
285
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
286 287 288 289 290 291 292 293 294 295
  terrno = TSDB_CODE_MND_INVALID_TOPIC;

  if (pCreate->sql == NULL) return -1;

  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
    if (pCreate->subStbName[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
    if (pCreate->subDbName[0] == 0) return -1;
S
Shengliang Guan 已提交
296
  }
297

L
Liu Jicong 已提交
298
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
299
  return 0;
300 301
}

S
Shengliang Guan 已提交
302
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
303
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
304
  SMqTopicObj topicObj = {0};
305
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
306
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
307 308
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
309
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
310 311
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
312 313
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
314
  topicObj.subType = pCreate->subType;
L
Liu Jicong 已提交
315

L
Liu Jicong 已提交
316
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
317 318
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
319

L
Liu Jicong 已提交
320 321
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
322 323
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
324 325 326
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
327

L
Liu Jicong 已提交
328
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
329

L
Liu Jicong 已提交
330 331 332
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
333 334
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
335 336 337 338 339
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
340 341
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
342 343 344 345 346
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
347 348
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
349 350
      return -1;
    }
L
Liu Jicong 已提交
351
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
352 353
    SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
    topicObj.stbUid = pStb->uid;
L
Liu Jicong 已提交
354
  }
L
Liu Jicong 已提交
355 356 357 358 359 360
  /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
  /*topicObj.ast = NULL;*/
  /*topicObj.astLen = 0;*/
  /*topicObj.physicalPlan = NULL;*/
  /*topicObj.withTbName = 1;*/
  /*topicObj.withSchema = 1;*/
L
Liu Jicong 已提交
361

362
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
363 364
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
365 366
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
367
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
368 369 370 371
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

372 373 374
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
375
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
376 377 378
    mndTransDrop(pTrans);
    return -1;
  }
379
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
380 381 382

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
383
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
384 385 386 387
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
388
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
389 390
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
391 392
}

S
Shengliang Guan 已提交
393 394
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
395 396 397 398 399 400
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
401
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
402 403 404
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
405 406

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
407

S
Shengliang Guan 已提交
408
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
409
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
410
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
411 412
  }

S
Shengliang Guan 已提交
413
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
414
  if (pTopic != NULL) {
415 416
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
417 418
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
419 420
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
421
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
422
    }
S
Shengliang Guan 已提交
423 424
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
425 426
  }

L
Liu Jicong 已提交
427
  pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
L
Liu Jicong 已提交
428 429
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
430 431 432
    goto CREATE_TOPIC_OVER;
  }

S
Shengliang Guan 已提交
433
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
434 435 436 437 438 439 440 441 442
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
443
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
444 445

CREATE_TOPIC_OVER:
S
Shengliang Guan 已提交
446
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
447
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
448 449
  }

S
Shengliang Guan 已提交
450
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
451
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
452
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
453

L
Liu Jicong 已提交
454
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
455 456 457
  return code;
}

458
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
459 460 461
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
462 463 464
    mndTransDrop(pTrans);
    return -1;
  }
465
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
466 467 468 469

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
470 471 472
    return -1;
  }

S
Shengliang Guan 已提交
473
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
474
  return 0;
L
Liu Jicong 已提交
475 476
}

S
Shengliang Guan 已提交
477
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
478 479
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
480
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
481

S
Shengliang Guan 已提交
482
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
483
    terrno = TSDB_CODE_INVALID_MSG;
484
    return -1;
L
Liu Jicong 已提交
485 486
  }

487
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
488 489 490 491 492 493 494 495 496 497
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
498

L
Liu Jicong 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
        mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
522
  if (pTopic->refConsumerCnt != 0) {
523
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
524 525
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
526 527
    return -1;
  }
L
Liu Jicong 已提交
528
#endif
S
Shengliang Guan 已提交
529

530
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
L
Liu Jicong 已提交
531 532 533 534
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
535

L
Liu Jicong 已提交
536 537 538 539 540
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
L
Liu Jicong 已提交
541 542
  }

L
Liu Jicong 已提交
543 544 545 546 547 548
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
549 550 551 552
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
553
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
554 555 556
    return -1;
  }

S
Shengliang Guan 已提交
557
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
558 559
}

S
Shengliang Guan 已提交
560
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
561
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
562 563 564
  return 0;
}

L
Liu Jicong 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
592 593
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
594 595 596
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
597

L
Liu Jicong 已提交
598
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
599 600 601
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
602 603 604
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
605

L
Liu Jicong 已提交
606
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
607
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
608 609
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
611
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
612

L
Liu Jicong 已提交
613 614 615 616
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
618
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
619

L
Liu Jicong 已提交
620
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
621 622
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
623 624
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
625
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
626
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
627
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
628 629 630 631 632

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

633
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
634 635 636
  return numOfRows;
}

637 638 639 640 641
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
642 643 644 645

  return 0;
}

L
Liu Jicong 已提交
646 647 648 649 650 651 652 653 654
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
655 656 657 658
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}