mndTopic.c 23.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "mndTopic.h"
S
Shengliang Guan 已提交
17
#include "mndAuth.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "parser.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_TOPIC_VER_NUMBER   1
S
Shengliang Guan 已提交
33
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36 37
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
38 39 40
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
L
Liu Jicong 已提交
41

S
Shengliang Guan 已提交
42
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
43
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);

L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55
int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
56 57
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
58
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
61
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
S
Shengliang Guan 已提交
62

L
Liu Jicong 已提交
63 64 65 66 67
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
68 69 70 71 72
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
  //
  return strchr(topic, '.') + 1;
}

L
Liu Jicong 已提交
73
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
74 75
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
76 77 78 79 80 81 82
  void   *swBuf = NULL;
  int32_t physicalPlanLen = 0;
  if (pTopic->physicalPlan) {
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
  }
  int32_t schemaLen = 0;
  if (pTopic->schema.nCols) {
L
Liu Jicong 已提交
83
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
L
Liu Jicong 已提交
84
  }
L
fix  
Liu Jicong 已提交
85
  int32_t size =
L
Liu Jicong 已提交
86
      sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
87
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
88
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
89 90

  int32_t dataPos = 0;
L
Liu Jicong 已提交
91
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
92 93 94 95 96 97
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
98
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
99 100 101
  /*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
  /*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
  /*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
L
Liu Jicong 已提交
102 103

  SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
104 105
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
106
  SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
107 108 109
  if (pTopic->astLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
  }
L
Liu Jicong 已提交
110
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
111 112
  if (physicalPlanLen) {
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
113
  }
L
Liu Jicong 已提交
114
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
115 116 117 118 119 120 121 122 123 124
  if (schemaLen) {
    swBuf = taosMemoryMalloc(schemaLen);
    if (swBuf == NULL) {
      goto TOPIC_ENCODE_OVER;
    }
    void *aswBuf = swBuf;
    taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
  }

L
Liu Jicong 已提交
125
  /*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127 128 129 130 131 132
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
L
Liu Jicong 已提交
133
  if (swBuf) taosMemoryFree(swBuf);
L
Liu Jicong 已提交
134 135 136 137 138 139 140
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
141 142 143
  return pRaw;
}

L
Liu Jicong 已提交
144
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
145
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
146
  int8_t sver = 0;
147
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
148

S
Shengliang Guan 已提交
149
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
150
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
151
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
152 153
  }

L
Liu Jicong 已提交
154 155
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
156 157
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
158
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
159
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161
  int32_t len;
L
Liu Jicong 已提交
162
  int32_t dataPos = 0;
L
Liu Jicong 已提交
163
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
164 165 166 167 168 169
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
170
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
171 172 173
  /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
  /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
  /*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
174

L
Liu Jicong 已提交
175 176
  SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);

L
Liu Jicong 已提交
177 178 179 180 181 182
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
  if (pTopic->sql == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
183
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
184

L
Liu Jicong 已提交
185
  SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193
  if (pTopic->astLen) {
    pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
    if (pTopic->ast == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->ast = NULL;
194
  }
L
Liu Jicong 已提交
195
  SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
196
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
197 198 199 200 201 202 203 204 205
  if (len) {
    pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
    if (pTopic->physicalPlan == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
  } else {
    pTopic->physicalPlan = NULL;
206
  }
207

L
Liu Jicong 已提交
208
  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218 219 220
  if (len) {
    void *buf = taosMemoryMalloc(len);
    if (buf == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto TOPIC_DECODE_OVER;
    }
    SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
    if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
      goto TOPIC_DECODE_OVER;
    }
  } else {
    pTopic->schema.nCols = 0;
221
    pTopic->schema.version = 0;
L
Liu Jicong 已提交
222
    pTopic->schema.pSchema = NULL;
L
Liu Jicong 已提交
223 224
  }

L
Liu Jicong 已提交
225
  /*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
228

L
Liu Jicong 已提交
229
  terrno = TSDB_CODE_SUCCESS;
230 231

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
232
  if (terrno != TSDB_CODE_SUCCESS) {
233
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
234
    taosMemoryFreeClear(pRow);
235 236
    return NULL;
  }
L
Liu Jicong 已提交
237

238
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
239 240 241
  return pRow;
}

L
Liu Jicong 已提交
242
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
243 244 245 246
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
247
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
248 249 250 251
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
252
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
253
  mTrace("topic:%s, perform update action", pOldTopic->name);
wafwerar's avatar
wafwerar 已提交
254
  atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
L
Liu Jicong 已提交
255 256
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

L
Liu Jicong 已提交
257
  /*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
L
Liu Jicong 已提交
258 259

  /*taosWLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
260 261

  // TODO handle update
L
Liu Jicong 已提交
262

L
Liu Jicong 已提交
263
  /*taosWUnLockLatch(&pOldTopic->lock);*/
L
Liu Jicong 已提交
264 265 266
  return 0;
}

L
Liu Jicong 已提交
267 268 269
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
270
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
271 272 273 274 275
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
276
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
277 278 279 280
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

L
Liu Jicong 已提交
281
#if 0
L
Liu Jicong 已提交
282 283
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
284
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286
  char db[TSDB_TOPIC_FNAME_LEN] = {0};
L
Liu Jicong 已提交
287 288 289 290
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}
L
Liu Jicong 已提交
291
#endif
L
Liu Jicong 已提交
292

S
Shengliang Guan 已提交
293 294
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
295

wafwerar's avatar
wafwerar 已提交
296
  SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
L
Liu Jicong 已提交
297 298 299 300 301 302 303
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
L
Liu Jicong 已提交
304
  memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
305 306 307 308 309
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

L
Liu Jicong 已提交
310
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
L
Liu Jicong 已提交
311 312 313 314 315 316 317 318 319 320
  terrno = TSDB_CODE_MND_INVALID_TOPIC;

  if (pCreate->sql == NULL) return -1;

  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
    if (pCreate->subStbName[0] == 0) return -1;
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
    if (pCreate->subDbName[0] == 0) return -1;
S
Shengliang Guan 已提交
321
  }
322

L
Liu Jicong 已提交
323
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
324
  return 0;
325 326
}

S
Shengliang Guan 已提交
327
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
328
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
329
  SMqTopicObj topicObj = {0};
330
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
331
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
332 333
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
334
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
335 336
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
337 338
  topicObj.sql = strdup(pCreate->sql);
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
L
Liu Jicong 已提交
339
  topicObj.subType = pCreate->subType;
L
Liu Jicong 已提交
340

L
Liu Jicong 已提交
341
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
342 343
    topicObj.ast = strdup(pCreate->ast);
    topicObj.astLen = strlen(pCreate->ast) + 1;
L
Liu Jicong 已提交
344 345
    /*topicObj.withTbName = pCreate->withTbName;*/
    /*topicObj.withSchema = pCreate->withSchema;*/
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347 348
    SNode *pAst = NULL;
    if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
L
Liu Jicong 已提交
349 350
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
351 352 353
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
L
Liu Jicong 已提交
354

L
Liu Jicong 已提交
355
    SQueryPlan *pPlan = NULL;
L
Liu Jicong 已提交
356

L
Liu Jicong 已提交
357 358 359
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
360 361
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
362 363 364 365 366
      return -1;
    }

    if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
367 368
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
369 370 371 372 373
      return -1;
    }

    if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
      mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
374 375
      taosMemoryFree(topicObj.ast);
      taosMemoryFree(topicObj.sql);
L
Liu Jicong 已提交
376 377
      return -1;
    }
L
Liu Jicong 已提交
378
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
379
  }
L
Liu Jicong 已提交
380 381 382 383 384 385
  /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
  /*topicObj.ast = NULL;*/
  /*topicObj.astLen = 0;*/
  /*topicObj.physicalPlan = NULL;*/
  /*topicObj.withTbName = 1;*/
  /*topicObj.withSchema = 1;*/
L
Liu Jicong 已提交
386

387
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
S
Shengliang Guan 已提交
388 389
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
L
Liu Jicong 已提交
390 391
    taosMemoryFreeClear(topicObj.ast);
    taosMemoryFreeClear(topicObj.sql);
L
Liu Jicong 已提交
392
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
393 394 395 396
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

397 398 399
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
400
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
401 402 403
    mndTransDrop(pTrans);
    return -1;
  }
404
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
405 406 407

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
L
Liu Jicong 已提交
408
    taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
409 410 411 412
    mndTransDrop(pTrans);
    return -1;
  }

L
Liu Jicong 已提交
413
  taosMemoryFreeClear(topicObj.physicalPlan);
S
Shengliang Guan 已提交
414 415
  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
416 417
}

S
Shengliang Guan 已提交
418 419
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
  SMnode           *pMnode = pReq->info.node;
L
Liu Jicong 已提交
420 421 422 423 424 425
  int32_t           code = -1;
  SMqTopicObj      *pTopic = NULL;
  SDbObj           *pDb = NULL;
  SUserObj         *pUser = NULL;
  SCMCreateTopicReq createTopicReq = {0};

S
Shengliang Guan 已提交
426
  if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
S
Shengliang Guan 已提交
427 428 429
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
430 431

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
432

S
Shengliang Guan 已提交
433
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
434
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
435
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
436 437
  }

S
Shengliang Guan 已提交
438
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
439
  if (pTopic != NULL) {
440 441
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
442 443
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
444 445
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
446
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
447
    }
S
Shengliang Guan 已提交
448 449
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
450 451
  }

L
Liu Jicong 已提交
452
  pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
L
Liu Jicong 已提交
453 454
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
455 456 457
    goto CREATE_TOPIC_OVER;
  }

S
Shengliang Guan 已提交
458
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
459 460 461 462 463 464 465 466 467
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
S
Shengliang Guan 已提交
468
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
469 470

CREATE_TOPIC_OVER:
S
Shengliang Guan 已提交
471
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
472
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
473 474
  }

S
Shengliang Guan 已提交
475
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
476
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
477
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
478

L
Liu Jicong 已提交
479
  tFreeSCMCreateTopicReq(&createTopicReq);
S
Shengliang Guan 已提交
480 481 482
  return code;
}

483
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
484 485 486
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
487 488 489
    mndTransDrop(pTrans);
    return -1;
  }
490
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
S
Shengliang Guan 已提交
491 492 493 494

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
495 496 497
    return -1;
  }

S
Shengliang Guan 已提交
498
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
499
  return 0;
L
Liu Jicong 已提交
500 501
}

S
Shengliang Guan 已提交
502
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
503 504
  SMnode        *pMnode = pReq->info.node;
  SSdb          *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
505
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
506

S
Shengliang Guan 已提交
507
  if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
508
    terrno = TSDB_CODE_INVALID_MSG;
509
    return -1;
L
Liu Jicong 已提交
510 511
  }

512
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
513 514 515 516 517 518 519 520 521 522
  if (pTopic == NULL) {
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
      return -1;
    }
  }
S
Shengliang Guan 已提交
523

L
Liu Jicong 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
    int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *name = taosArrayGetP(pConsumer->assignedTopics, i);
      if (strcmp(name, pTopic->name) == 0) {
        mndReleaseConsumer(pMnode, pConsumer);
        mndReleaseTopic(pMnode, pTopic);
        terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
        mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
               pConsumer->consumerId, pConsumer->cgroup);
        return -1;
      }
    }
    sdbRelease(pSdb, pConsumer);
  }

#if 0
L
Liu Jicong 已提交
547
  if (pTopic->refConsumerCnt != 0) {
548
    mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
549 550
    terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
551 552
    return -1;
  }
L
Liu Jicong 已提交
553
#endif
S
Shengliang Guan 已提交
554

555
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
L
Liu Jicong 已提交
556 557 558 559
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
L
Liu Jicong 已提交
560

L
Liu Jicong 已提交
561 562 563 564 565
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
L
Liu Jicong 已提交
566 567
  }

L
Liu Jicong 已提交
568 569 570 571 572 573
  if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
    ASSERT(0);
    return -1;
  }

  int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
L
Liu Jicong 已提交
574 575 576 577
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
578
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
579 580 581
    return -1;
  }

S
Shengliang Guan 已提交
582
  return TSDB_CODE_ACTION_IN_PROGRESS;
L
Liu Jicong 已提交
583 584
}

S
Shengliang Guan 已提交
585
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
586
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
587 588 589
  return 0;
}

L
Liu Jicong 已提交
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
  SSdb   *pSdb = pMnode->pSdb;
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
    SMqTopicObj *pTopic = NULL;
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
  mndReleaseDb(pMnode, pDb);
  return 0;
}

S
Shengliang Guan 已提交
617 618
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode      *pMnode = pReq->info.node;
L
Liu Jicong 已提交
619 620 621
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
S
Shengliang Guan 已提交
622

L
Liu Jicong 已提交
623
  while (numOfRows < rowsCapacity) {
L
Liu Jicong 已提交
624 625 626
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

L
Liu Jicong 已提交
627 628 629
    SColumnInfoData *pColInfo;
    SName            n;
    int32_t          cols = 0;
S
Shengliang Guan 已提交
630

L
Liu Jicong 已提交
631
    char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
632
    tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
633 634
    tNameGetDbName(&n, varDataVal(topicName));
    varDataSetLen(topicName, strlen(varDataVal(topicName)));
L
Liu Jicong 已提交
635
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
636
    colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
L
Liu Jicong 已提交
637

L
Liu Jicong 已提交
638 639 640 641
    char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&n, varDataVal(dbName));
    varDataSetLen(dbName, strlen(varDataVal(dbName)));
L
Liu Jicong 已提交
642
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
643
    colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
L
Liu Jicong 已提交
644

L
Liu Jicong 已提交
645
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
646 647
    colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);

L
Liu Jicong 已提交
648 649
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
L
Liu Jicong 已提交
650
    varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
L
Liu Jicong 已提交
651
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
652
    colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
L
Liu Jicong 已提交
653 654 655 656 657

    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

658
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
659 660 661
  return numOfRows;
}

662 663 664 665 666
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
L
Liu Jicong 已提交
667 668 669 670

  return 0;
}

L
Liu Jicong 已提交
671 672 673 674 675 676 677 678 679
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
  SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;

  return 0;
}

L
Liu Jicong 已提交
680 681 682 683
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708

int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void        *pIter = NULL;
  SMqTopicObj *pTopic = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

    if (pTopic->dbUid != pDb->uid) {
      sdbRelease(pSdb, pTopic);
      continue;
    }

    if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
      goto END;
    }
  }

  code = 0;
END:
  return code;
}