mndTopic.c 19.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndTopic.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
L
Liu Jicong 已提交
19 20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
23
#include "mndStb.h"
L
Liu Jicong 已提交
24 25 26 27 28
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"

S
Shengliang Guan 已提交
29 30
#define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_RESERVE_SIZE 64
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
S
Shengliang Guan 已提交
35 36 37
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp);
S
Shengliang Guan 已提交
38
static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
39 40
static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42 43 44 45 46 47 48 49 50 51

int32_t mndInitTopic(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TOPIC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
                     .insertFp = (SdbInsertFp)mndTopicActionInsert,
                     .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTopicActionDelete};

S
Shengliang Guan 已提交
52 53
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
H
Hongze Cheng 已提交
54
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
L
Liu Jicong 已提交
55

S
Shengliang Guan 已提交
56 57 58 59
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);

L
Liu Jicong 已提交
60 61 62 63 64
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTopic(SMnode *pMnode) {}

L
Liu Jicong 已提交
65
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
66 67
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
68 69
  int32_t  logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
  int32_t  physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
L
Liu Jicong 已提交
70
  int32_t  size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE;
S
Shengliang Guan 已提交
71
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
L
Liu Jicong 已提交
72
  if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
L
Liu Jicong 已提交
73 74

  int32_t dataPos = 0;
L
Liu Jicong 已提交
75 76 77 78 79 80 81 82 83
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
84
  SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
85
  SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
86
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
87
  SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
L
Liu Jicong 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101

  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);

  terrno = TSDB_CODE_SUCCESS;

TOPIC_ENCODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
102 103 104
  return pRaw;
}

L
Liu Jicong 已提交
105
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
106
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
107
  int8_t sver = 0;
108
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
109

S
Shengliang Guan 已提交
110
  if (sver != MND_TOPIC_VER_NUMBER) {
L
Liu Jicong 已提交
111
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
112
    goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
113 114
  }

L
Liu Jicong 已提交
115 116
  int32_t  size = sizeof(SMqTopicObj);
  SSdbRow *pRow = sdbAllocRow(size);
117 118
  if (pRow == NULL) goto TOPIC_DECODE_OVER;

L
Liu Jicong 已提交
119
  SMqTopicObj *pTopic = sdbGetRowObj(pRow);
120
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
  int32_t len;
L
Liu Jicong 已提交
123
  int32_t dataPos = 0;
L
Liu Jicong 已提交
124 125 126 127 128 129 130 131
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
132 133

  pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
L
Liu Jicong 已提交
134
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
135 136

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
137
  pTopic->logicalPlan = calloc(len + 1, sizeof(char));
138 139 140 141
  if (pTopic->logicalPlan == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
142
  SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
143 144

  SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
L
Liu Jicong 已提交
145
  pTopic->physicalPlan = calloc(len + 1, sizeof(char));
146 147 148 149 150
  if (pTopic->physicalPlan == NULL) {
    free(pTopic->logicalPlan);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto TOPIC_DECODE_OVER;
  }
L
Liu Jicong 已提交
151
  SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
152

L
Liu Jicong 已提交
153
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
154

L
Liu Jicong 已提交
155
  terrno = TSDB_CODE_SUCCESS;
156 157

TOPIC_DECODE_OVER:
L
Liu Jicong 已提交
158
  if (terrno != TSDB_CODE_SUCCESS) {
159 160 161 162
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }
L
Liu Jicong 已提交
163

164
  mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
L
Liu Jicong 已提交
165 166 167
  return pRow;
}

L
Liu Jicong 已提交
168
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
169 170 171 172
  mTrace("topic:%s, perform insert action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
173
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
174 175 176 177
  mTrace("topic:%s, perform delete action", pTopic->name);
  return 0;
}

L
Liu Jicong 已提交
178
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
L
Liu Jicong 已提交
179 180 181 182 183
  mTrace("topic:%s, perform update action", pOldTopic->name);
  atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime);
  atomic_exchange_32(&pOldTopic->version, pNewTopic->version);

  taosWLockLatch(&pOldTopic->lock);
L
Liu Jicong 已提交
184 185

  // TODO handle update
L
Liu Jicong 已提交
186 187 188 189 190

  taosWUnLockLatch(&pOldTopic->lock);
  return 0;
}

L
Liu Jicong 已提交
191 192 193
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
  SSdb        *pSdb = pMnode->pSdb;
  SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
S
Shengliang Guan 已提交
194
  if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
L
Liu Jicong 已提交
195 196 197 198 199
    terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
  }
  return pTopic;
}

L
Liu Jicong 已提交
200
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
L
Liu Jicong 已提交
201 202 203 204 205 206
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTopic);
}

static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
  SName name = {0};
207
  tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
L
Liu Jicong 已提交
208 209 210 211 212 213 214

  char db[TSDB_TABLE_FNAME_LEN] = {0};
  tNameGetFullDbName(&name, db);

  return mndAcquireDb(pMnode, db);
}

S
Shengliang Guan 已提交
215 216
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
  int32_t contLen = sizeof(SDDropTopicReq);
L
Liu Jicong 已提交
217

S
Shengliang Guan 已提交
218
  SDDropTopicReq *pDrop = calloc(1, contLen);
L
Liu Jicong 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231
  if (pDrop == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pDrop->head.contLen = htonl(contLen);
  pDrop->head.vgId = htonl(pVgroup->vgId);
  memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN);
  pDrop->tuid = htobe64(pTopic->uid);

  return pDrop;
}

S
Shengliang Guan 已提交
232 233 234 235 236
static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
    return -1;
  }
L
Liu Jicong 已提交
237 238 239
  return 0;
}

S
Shengliang Guan 已提交
240
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) {
L
Liu Jicong 已提交
241
  mDebug("topic:%s to create", pCreate->name);
L
Liu Jicong 已提交
242
  SMqTopicObj topicObj = {0};
243
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
244
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
245 246
  topicObj.createTime = taosGetTimestampMs();
  topicObj.updateTime = topicObj.createTime;
L
Liu Jicong 已提交
247
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
L
Liu Jicong 已提交
248 249
  topicObj.dbUid = pDb->uid;
  topicObj.version = 1;
L
Liu Jicong 已提交
250 251 252
  topicObj.sql = pCreate->sql;
  topicObj.physicalPlan = pCreate->physicalPlan;
  topicObj.logicalPlan = pCreate->logicalPlan;
253
  topicObj.sqlLen = strlen(pCreate->sql);
L
Liu Jicong 已提交
254

S
Shengliang Guan 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
  if (pTrans == NULL) {
    mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
L
Liu Jicong 已提交
278 279
}

S
Shengliang Guan 已提交
280
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
281 282 283 284 285
  SMnode          *pMnode = pReq->pMnode;
  int32_t          code = -1;
  SMqTopicObj     *pTopic = NULL;
  SDbObj          *pDb = NULL;
  SUserObj        *pUser = NULL;
S
Shengliang Guan 已提交
286
  SMCreateTopicReq createTopicReq = {0};
S
Shengliang Guan 已提交
287 288 289 290 291

  if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_TOPIC_OVER;
  }
292 293

  mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
L
Liu Jicong 已提交
294

S
Shengliang Guan 已提交
295
  if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
296
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
S
Shengliang Guan 已提交
297
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
298 299
  }

S
Shengliang Guan 已提交
300
  pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
301
  if (pTopic != NULL) {
302 303
    if (createTopicReq.igExists) {
      mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
S
Shengliang Guan 已提交
304 305
      code = 0;
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
306 307
    } else {
      terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
S
Shengliang Guan 已提交
308
      goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
309
    }
S
Shengliang Guan 已提交
310 311
  } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
    goto CREATE_TOPIC_OVER;
L
Liu Jicong 已提交
312 313
  }

S
Shengliang Guan 已提交
314
  pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
L
Liu Jicong 已提交
315 316
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
S
Shengliang Guan 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
    goto CREATE_TOPIC_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    goto CREATE_TOPIC_OVER;
  }

  if (mndCheckWriteAuth(pUser, pDb) != 0) {
    goto CREATE_TOPIC_OVER;
  }

  code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_TOPIC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
334
    mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
L
Liu Jicong 已提交
335 336
  }

S
Shengliang Guan 已提交
337
  mndReleaseTopic(pMnode, pTopic);
L
Liu Jicong 已提交
338
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
339
  mndReleaseUser(pMnode, pUser);
L
Liu Jicong 已提交
340

S
Shengliang Guan 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
  tFreeSMCreateTopicReq(&createTopicReq);
  return code;
}

static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
  if (pTrans == NULL) {
    mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

  SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
    mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
364 365 366
    return -1;
  }

S
Shengliang Guan 已提交
367
  mndTransDrop(pTrans);
L
Liu Jicong 已提交
368
  return 0;
L
Liu Jicong 已提交
369 370
}

S
Shengliang Guan 已提交
371 372 373
static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) {
  SMnode        *pMnode = pReq->pMnode;
  SMDropTopicReq dropReq = {0};
L
Liu Jicong 已提交
374

S
Shengliang Guan 已提交
375 376 377 378 379 380
  if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mDebug("topic:%s, start to drop", dropReq.name);
L
Liu Jicong 已提交
381

S
Shengliang Guan 已提交
382
  SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
L
Liu Jicong 已提交
383
  if (pTopic == NULL) {
S
Shengliang Guan 已提交
384 385
    if (dropReq.igNotExists) {
      mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
L
Liu Jicong 已提交
386 387 388
      return 0;
    } else {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
S
Shengliang Guan 已提交
389
      mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
390 391 392 393
      return -1;
    }
  }

S
Shengliang Guan 已提交
394
  int32_t code = mndDropTopic(pMnode, pReq, pTopic);
L
Liu Jicong 已提交
395 396 397 398
  mndReleaseTopic(pMnode, pTopic);

  if (code != 0) {
    terrno = code;
S
Shengliang Guan 已提交
399
    mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
L
Liu Jicong 已提交
400 401 402 403 404 405
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
406 407
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
L
Liu Jicong 已提交
408 409 410
  return 0;
}

S
Shengliang Guan 已提交
411
static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
412 413 414 415 416 417 418
  SMnode       *pMnode = pReq->pMnode;
  STableInfoReq infoReq = {0};

  if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
L
Liu Jicong 已提交
419

S
Shengliang Guan 已提交
420
  mDebug("topic:%s, start to retrieve meta", infoReq.tbName);
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422
#if 0
L
Liu Jicong 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
  SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname);
  if (pTopic == NULL) {
    mndReleaseDb(pMnode, pDb);
    terrno = TSDB_CODE_MND_INVALID_TOPIC;
    mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  taosRLockLatch(&pTopic->lock);
  int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags;
S
Shengliang Guan 已提交
440
  int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
L
Liu Jicong 已提交
441

S
Shengliang Guan 已提交
442
  STableMetaRsp *pMeta = rpcMallocCont(contLen);
L
Liu Jicong 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
  if (pMeta == NULL) {
    taosRUnLockLatch(&pTopic->lock);
    mndReleaseDb(pMnode, pDb);
    mndReleaseTopic(pMnode, pTopic);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN);
  pMeta->numOfTags = htonl(pTopic->numOfTags);
  pMeta->numOfColumns = htonl(pTopic->numOfColumns);
  pMeta->precision = pDb->cfg.precision;
  pMeta->tableType = TSDB_SUPER_TABLE;
  pMeta->update = pDb->cfg.update;
  pMeta->sversion = htonl(pTopic->version);
  pMeta->tuid = htonl(pTopic->uid);

  for (int32_t i = 0; i < totalCols; ++i) {
S
Shengliang Guan 已提交
462
    SSchema *pSchema = &pMeta->pSchemas[i];
L
Liu Jicong 已提交
463 464 465 466 467 468 469 470 471 472
    SSchema *pSrcSchema = &pTopic->pSchema[i];
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
    pSchema->type = pSrcSchema->type;
    pSchema->colId = htonl(pSrcSchema->colId);
    pSchema->bytes = htonl(pSrcSchema->bytes);
  }
  taosRUnLockLatch(&pTopic->lock);
  mndReleaseDb(pMnode, pDb);
  mndReleaseTopic(pMnode, pTopic);

S
Shengliang Guan 已提交
473 474
  pReq->pCont = pMeta;
  pReq->contLen = contLen;
L
Liu Jicong 已提交
475 476 477 478 479 480 481

  mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif
  return 0;
}

static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
S
Shengliang Guan 已提交
482
  SSdb   *pSdb = pMnode->pSdb;
L
Liu Jicong 已提交
483 484 485 486 487 488 489 490 491
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfTopics = 0;
  void   *pIter = NULL;
  while (1) {
L
Liu Jicong 已提交
492
    SMqTopicObj *pTopic = NULL;
L
Liu Jicong 已提交
493 494 495
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
496 497 498
    if (pTopic->dbUid == pDb->uid) {
      numOfTopics++;
    }
L
Liu Jicong 已提交
499 500 501 502 503

    sdbRelease(pSdb, pTopic);
  }

  *pNumOfTopics = numOfTopics;
S
Shengliang Guan 已提交
504
  mndReleaseDb(pMnode, pDb);
L
Liu Jicong 已提交
505 506 507
  return 0;
}

S
Shengliang Guan 已提交
508 509
static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pMnode;
L
Liu Jicong 已提交
510 511 512 513 514 515 516
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
S
Shengliang Guan 已提交
517
  SSchema *pSchema = pMeta->pSchemas;
L
Liu Jicong 已提交
518 519 520 521

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
S
Shengliang Guan 已提交
522
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
523 524 525 526 527
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
528
  pSchema[cols].bytes = pShow->bytes[cols];
L
Liu Jicong 已提交
529 530
  cols++;

S
Shengliang Guan 已提交
531 532 533 534 535 536
  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
  pSchema[cols].bytes = pShow->bytes[cols];
  cols++;

S
Shengliang Guan 已提交
537
  pMeta->numOfColumns = cols;
L
Liu Jicong 已提交
538 539 540 541 542 543 544 545 546
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
547
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
L
Liu Jicong 已提交
548 549 550 551

  return 0;
}

S
Shengliang Guan 已提交
552 553
static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode      *pMnode = pReq->pMnode;
L
Liu Jicong 已提交
554 555 556 557 558
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  SMqTopicObj *pTopic = NULL;
  int32_t      cols = 0;
  char        *pWrite;
S
Shengliang Guan 已提交
559 560 561 562
  char         prefix[TSDB_DB_FNAME_LEN] = {0};

  SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
  if (pDb == NULL) return 0;
L
Liu Jicong 已提交
563

S
Shengliang Guan 已提交
564
  tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
565 566 567 568 569 570 571
  strcat(prefix, TS_PATH_DELIMITER);
  int32_t prefixLen = (int32_t)strlen(prefix);

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
572 573 574 575 576
    if (pTopic->dbUid != pDb->uid) {
      if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
        mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
      }

L
Liu Jicong 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
      sdbRelease(pSdb, pTopic);
      continue;
    }

    cols = 0;

    char topicName[TSDB_TABLE_NAME_LEN] = {0};
    tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN);
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, topicName);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pTopic->createTime;
    cols++;

S
Shengliang Guan 已提交
593 594 595 596
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
    cols++;

L
Liu Jicong 已提交
597 598 599 600
    numOfRows++;
    sdbRelease(pSdb, pTopic);
  }

S
Shengliang Guan 已提交
601
  mndReleaseDb(pMnode, pDb);
L
Liu Jicong 已提交
602 603 604 605 606 607 608 609 610
  pShow->numOfReads += numOfRows;
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  return numOfRows;
}

static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}