mndTopic.c 22.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39 40
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
L
Liu Jicong 已提交
41

S
Shengliang Guan 已提交
42
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
43
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55
int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
56 57
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
58
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
61
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
62

L
Liu Jicong 已提交
63 64 65 66 67
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
68 69 70 71 72
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
73
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
74 75
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
76 77 78 79 80 81 82
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
83
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
84
  }
L
fix  
Liu Jicong 已提交
85
  int32_t size =
L
Liu Jicong 已提交
86
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
87
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
88
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
89 90

  int32_t dataPos = 0;
L
Liu Jicong 已提交
91
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
92 93 94 95 96 97
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
98 99 100 101
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
  SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
102 103

  SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
104 105
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
106
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
107 108 109
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
110
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
111 112
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
113
  }
L
Liu Jicong 已提交
114
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
115 116 117 118 119 120 121 122 123 124
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }

L
Liu Jicong 已提交
125
  /*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127 128 129 130 131 132
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
133
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
134 135 136 137 138 139 140
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
141 142 143
  return pRaw;
}

L
Liu Jicong 已提交
144
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
145
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
146
  int8_t sver = 0;
147
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
148

S
Shengliang Guan 已提交
149
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
150
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
151
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
152 153
  }

L
Liu Jicong 已提交
154 155
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
156 157
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
158
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
159
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161
  int32_t len;
L
Liu Jicong 已提交
162
  int32_t dataPos = 0;
L
Liu Jicong 已提交
163
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
164 165 166 167 168 169
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
170 171 172 173
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
174

L
Liu Jicong 已提交
175 176
  SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);

L
Liu Jicong 已提交
177 178 179 180 181 182
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
183
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
184

L
Liu Jicong 已提交
185
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
194
  }
L
Liu Jicong 已提交
195
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
196
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
197 198 199 200 201 202 203 204 205
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
206
  }
207

L
Liu Jicong 已提交
208
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218 219 220
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
221
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
222
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
223 224
  }

L
Liu Jicong 已提交
225
  /*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
228

L
Liu Jicong 已提交
229
  terrno = TSDB_CODE_SUCCESS;
230 231

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
232
  if (terrno != TSDB_CODE_SUCCESS) {
233
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
234
    taosMemoryFreeClear(pRow);
235 236
    return NULL;
  }
L
Liu Jicong 已提交
237

238
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
239 240 241
  return pRow;
}

L
Liu Jicong 已提交
242
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
243 244 245 246
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
247
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
248 249 250 251
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
252
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
253
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
254
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
255 256
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
257
  /*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
L
Liu Jicong 已提交
258 259

  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
260 261

  // TODO handle update
L
Liu Jicong 已提交
262

L
Liu Jicong 已提交
263
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
264 265 266
  return 0;
}

L
Liu Jicong 已提交
267 268 269
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
270
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
271 272 273 274 275
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
276
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
277 278 279 280
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

L
Liu Jicong 已提交
281
#if 0
L
Liu Jicong 已提交
282 283
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
284
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
287 288 289 290
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}
L
Liu Jicong 已提交
291
#endif
L
Liu Jicong 已提交
292

S
Shengliang Guan 已提交
293 294
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
295

wafwerar's avatar
wafwerar 已提交
296
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
297 298 299 300 301 302 303
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
304
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
305 306 307 308 309
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
310
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
311
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
L
Liu Jicong 已提交
312
    terrno = TSDB_CODE_MND_INVALID_TOPIC;
S
Shengliang Guan 已提交
313 314
    return -1;
  }
315

L
Liu Jicong 已提交
316
  return 0;
317 318
}

S
Shengliang Guan 已提交
319
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
320
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
321
  SMqTopicObj topicObj = {0};
322
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
323
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
324 325
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
326
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
327 328
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
329 330
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
331
  /*topicObj.refConsumerCnt = 0;*/
L
Liu Jicong 已提交
332

L
Liu Jicong 已提交
333
  if (pCreate->ast && pCreate->ast[0]) {
L
Liu Jicong 已提交
334 335
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
336
    topicObj.subType = TOPIC_SUB_TYPE__TABLE;
L
Liu Jicong 已提交
337 338
    topicObj.withTbName = pCreate->withTbName;
    topicObj.withSchema = pCreate->withSchema;
L
Liu Jicong 已提交
339

L
Liu Jicong 已提交
340 341
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
342 343
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
344 345 346
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
347

L
Liu Jicong 已提交
348
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
349

L
Liu Jicong 已提交
350 351 352
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
353 354
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
355 356 357 358 359
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
360 361
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
362 363 364 365 366
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
367 368
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
369 370 371
      return -1;
    }
  } else {
L
Liu Jicong 已提交
372 373 374
    topicObj.ast = NULL;
    topicObj.astLen = 0;
    topicObj.physicalPlan = NULL;
L
Liu Jicong 已提交
375 376 377
    topicObj.subType = TOPIC_SUB_TYPE__DB;
    topicObj.withTbName = 1;
    topicObj.withSchema = 1;
L
Liu Jicong 已提交
378 379
  }

380
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
381 382
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
383 384
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
385
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
386 387 388 389
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

390 391 392
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
393
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
394 395 396
    mndTransDrop(pTrans);
    return -1;
  }
397
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
398 399 400

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
401
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
402 403 404 405
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
406
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
407 408
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
409 410
}

S
Shengliang Guan 已提交
411 412
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
413 414 415 416 417 418
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
419
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
420 421 422
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
423 424

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
425

S
Shengliang Guan 已提交
426
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
427
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
428
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
429 430
  }

S
Shengliang Guan 已提交
431
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
432
  if (pTopic != NULL) {
433 434
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
435 436
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
437 438
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
439
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
440
    }
S
Shengliang Guan 已提交
441 442
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
443 444
  }

L
Liu Jicong 已提交
445
  pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
L
Liu Jicong 已提交
446 447
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
448 449 450
    goto CREATE_TOPIC_OVER;
  }

S
Shengliang Guan 已提交
451
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
452 453 454 455 456 457 458 459 460
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
461
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
462 463

CREATE_TOPIC_OVER:
S
Shengliang Guan 已提交
464
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
465
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
466 467
  }

S
Shengliang Guan 已提交
468
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
469
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
470
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
471

L
Liu Jicong 已提交
472
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
473 474 475
  return code;
}

476
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
477 478 479
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
480 481 482
    mndTransDrop(pTrans);
    return -1;
  }
483
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
484 485 486 487

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
488 489 490
    return -1;
  }

S
Shengliang Guan 已提交
491
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
492
  return 0;
L
Liu Jicong 已提交
493 494
}

S
Shengliang Guan 已提交
495
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
496 497
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
498
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
499

S
Shengliang Guan 已提交
500
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
501
    terrno = TSDB_CODE_INVALID_MSG;
502
    return -1;
L
Liu Jicong 已提交
503 504
  }

505
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
506 507 508 509 510 511 512 513 514 515
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
516

L
Liu Jicong 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
        mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
540
  if (pTopic->refConsumerCnt != 0) {
541
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
542 543
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
544 545
    return -1;
  }
L
Liu Jicong 已提交
546
#endif
S
Shengliang Guan 已提交
547

548
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
L
Liu Jicong 已提交
549 550 551 552
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
553

L
Liu Jicong 已提交
554 555 556 557 558
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
L
Liu Jicong 已提交
559 560
  }

L
Liu Jicong 已提交
561 562 563 564 565 566
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
567 568 569 570
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
571
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
572 573 574
    return -1;
  }

S
Shengliang Guan 已提交
575
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
576 577
}

S
Shengliang Guan 已提交
578
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
579
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
580 581 582
  return 0;
}

L
Liu Jicong 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
610 611
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
612 613 614
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
615

L
Liu Jicong 已提交
616
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
617 618 619
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
620 621 622
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
623

L
Liu Jicong 已提交
624
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
625
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
626 627
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
628
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
629
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
630

L
Liu Jicong 已提交
631 632 633 634
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
635
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
636
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
637

L
Liu Jicong 已提交
638
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
639 640
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
641 642
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
643
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
644
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
645
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
646 647 648 649 650

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

651
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
652 653 654
  return numOfRows;
}

655 656 657 658 659
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
660 661 662 663

  return 0;
}

L
Liu Jicong 已提交
664 665 666 667 668 669 670 671 672
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
673 674 675 676
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}