mndSnode.c 16.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndSnode.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19 20 21
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
#define TSDB_SNODE_VER_NUMBER   1
S
Shengliang Guan 已提交
25 26 27 28 29 30
#define TSDB_SNODE_RESERVE_SIZE 64

static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw);
static int32_t  mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static int32_t  mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
S
Shengliang Guan 已提交
31
static int32_t  mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
S
Shengliang Guan 已提交
32 33 34 35 36 37
static int32_t  mndProcessCreateSnodeReq(SNodeMsg *pReq);
static int32_t  mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t  mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
static int32_t  mndProcessDropSnodeRsp(SNodeMsg *pRsp);
static int32_t  mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t  mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
static void     mndCancelGetNextSnode(SMnode *pMnode, void *pIter);

int32_t mndInitSnode(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SNODE,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
                     .insertFp = (SdbInsertFp)mndSnodeActionInsert,
                     .updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSnodeActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq);
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndGetSnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupSnode(SMnode *pMnode) {}

L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71
SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode) {
  SEpSet epSet;
  memcpy(epSet.eps->fqdn, pSnode->pDnode->fqdn, 128);
  epSet.eps->port = pSnode->pDnode->port;
  epSet.numOfEps = 1;
  epSet.inUse = 0;
  return epSet;
}

S
Shengliang Guan 已提交
72
static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) {
73 74
  SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId);
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85
    terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
  }
  return pObj;
}

static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pObj);
}

static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) {
86 87
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
88
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE);
89
  if (pRaw == NULL) goto SNODE_ENCODE_OVER;
S
Shengliang Guan 已提交
90 91

  int32_t dataPos = 0;
92 93 94 95 96 97
  SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER)
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
98

99 100 101 102 103 104 105 106
SNODE_ENCODE_OVER:
  if (terrno != 0) {
    mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("snode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
107 108 109 110
  return pRaw;
}

static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
111 112
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
113
  int8_t sver = 0;
114
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
115 116 117

  if (sver != TSDB_SNODE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
118
    goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
119 120
  }

121 122 123
  SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
  if (pRow == NULL) goto SNODE_DECODE_OVER;

S
Shengliang Guan 已提交
124
  SSnodeObj *pObj = sdbGetRowObj(pRow);
125
  if (pObj == NULL) goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
126 127

  int32_t dataPos = 0;
128 129 130 131 132 133
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER)
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
134

135 136 137 138 139 140 141 142
SNODE_DECODE_OVER:
  if (terrno != 0) {
    mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("snode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
143 144 145 146
  return pRow;
}

static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) {
147
  mTrace("snode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
148 149 150 151 152 153 154 155 156 157 158
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
    mError("snode:%d, failed to perform insert action since %s", pObj->id, terrstr());
    return -1;
  }

  return 0;
}

static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
159
  mTrace("snode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
160 161 162 163 164 165 166 167
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
  }

  return 0;
}

S
Shengliang Guan 已提交
168
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
S
Shengliang Guan 已提交
169
  mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
170
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
171 172 173 174 175 176 177 178 179 180 181
  return 0;
}

static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

182 183 184 185 186 187 188 189
static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
190 191 192 193 194 195 196 197 198
static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
199 200 201 202 203
  SDCreateSnodeReq createReq = {0};
  createReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
204
  if (pReq == NULL) {
S
Shengliang Guan 已提交
205 206 207
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
208
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
209 210 211

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
212
  action.pCont = pReq;
S
Shengliang Guan 已提交
213
  action.contLen = contLen;
S
Shengliang Guan 已提交
214
  action.msgType = TDMT_DND_CREATE_SNODE;
S
shm  
Shengliang Guan 已提交
215
  action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
S
Shengliang Guan 已提交
216 217

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
218
    free(pReq);
S
Shengliang Guan 已提交
219 220 221 222 223 224
    return -1;
  }

  return 0;
}

225
static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
226 227 228 229 230
  SDDropSnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
231
  if (pReq == NULL) {
232 233 234
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
235
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
236 237 238

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
239
  action.pCont = pReq;
S
Shengliang Guan 已提交
240
  action.contLen = contLen;
241
  action.msgType = TDMT_DND_DROP_SNODE;
S
shm  
Shengliang Guan 已提交
242
  action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
243 244

  if (mndTransAppendUndoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
245
    free(pReq);
246 247 248 249 250 251
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
252
static int32_t mndCreateSnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) {
253 254
  int32_t code = -1;

S
Shengliang Guan 已提交
255 256 257 258 259
  SSnodeObj snodeObj = {0};
  snodeObj.id = pDnode->id;
  snodeObj.createdTime = taosGetTimestampMs();
  snodeObj.updateTime = snodeObj.createdTime;

S
Shengliang Guan 已提交
260
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
261
  if (pTrans == NULL) goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
262

263
  mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
S
Shengliang Guan 已提交
264

265 266 267 268 269 270
  if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
271 272 273 274 275 276 277 278

  code = 0;

CREATE_SNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
279 280
static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) {
  SMnode          *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
281 282 283 284 285 286 287 288 289 290
  int32_t          code = -1;
  SSnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SUserObj        *pUser = NULL;
  SMCreateSnodeReq createReq = {0};

  if (tDeserializeSMCreateDropQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_SNODE_OVER;
  }
S
Shengliang Guan 已提交
291

S
Shengliang Guan 已提交
292
  mDebug("snode:%d, start to create", createReq.dnodeId);
S
Shengliang Guan 已提交
293

S
Shengliang Guan 已提交
294
  pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
295
  if (pObj != NULL) {
S
Shengliang Guan 已提交
296
    terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
S
Shengliang Guan 已提交
297
    goto CREATE_SNODE_OVER;
298
  } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
S
Shengliang Guan 已提交
299
    goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
300 301
  }

S
Shengliang Guan 已提交
302
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
303 304
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
305
    goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
306 307
  }

S
Shengliang Guan 已提交
308 309 310 311 312
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto CREATE_SNODE_OVER;
  }
S
Shengliang Guan 已提交
313

S
Shengliang Guan 已提交
314
  if (mndCheckNodeAuth(pUser)) {
S
Shengliang Guan 已提交
315 316 317 318 319 320 321
    goto CREATE_SNODE_OVER;
  }

  code = mndCreateSnode(pMnode, pReq, pDnode, &createReq);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_SNODE_OVER:
S
Shengliang Guan 已提交
322
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
323
    mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
324 325 326
    return -1;
  }

S
Shengliang Guan 已提交
327 328 329 330 331
  mndReleaseSnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
}

static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
351 352 353 354 355
  SDDropSnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
356
  if (pReq == NULL) {
S
Shengliang Guan 已提交
357 358 359
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
360
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
361 362 363

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
364
  action.pCont = pReq;
S
Shengliang Guan 已提交
365
  action.contLen = contLen;
S
Shengliang Guan 已提交
366
  action.msgType = TDMT_DND_DROP_SNODE;
S
shm  
Shengliang Guan 已提交
367
  action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
368 369

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
370
    free(pReq);
S
Shengliang Guan 已提交
371 372 373 374 375 376
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
377
static int32_t mndDropSnode(SMnode *pMnode, SNodeMsg *pReq, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
378
  int32_t code = -1;
379

S
Shengliang Guan 已提交
380
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
381
  if (pTrans == NULL) goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
382 383 384

  mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);

385 386 387 388
  if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
389 390 391 392 393 394 395 396

  code = 0;

DROP_SNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
397 398
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
399 400 401 402 403 404 405 406 407
  int32_t        code = -1;
  SUserObj      *pUser = NULL;
  SSnodeObj     *pObj = NULL;
  SMDropSnodeReq dropReq = {0};

  if (tDeserializeSMCreateDropQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto DROP_SNODE_OVER;
  }
S
Shengliang Guan 已提交
408

S
Shengliang Guan 已提交
409
  mDebug("snode:%d, start to drop", dropReq.dnodeId);
S
Shengliang Guan 已提交
410

S
Shengliang Guan 已提交
411
  if (dropReq.dnodeId <= 0) {
S
Shengliang Guan 已提交
412
    terrno = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
413
    goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
414 415
  }

S
Shengliang Guan 已提交
416
  pObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
417
  if (pObj == NULL) {
S
Shengliang Guan 已提交
418 419 420 421 422 423 424
    goto DROP_SNODE_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
425 426
  }

S
Shengliang Guan 已提交
427
  if (mndCheckNodeAuth(pUser)) {
S
Shengliang Guan 已提交
428 429 430 431 432 433 434 435
    goto DROP_SNODE_OVER;
  }

  code = mndDropSnode(pMnode, pReq, pObj);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

DROP_SNODE_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
436 437 438
    mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
  }

S
Shengliang Guan 已提交
439 440 441 442
  mndReleaseSnode(pMnode, pObj);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
443 444
}

S
Shengliang Guan 已提交
445
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
446
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
447 448 449
  return 0;
}

S
Shengliang Guan 已提交
450
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
451
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
452 453 454
  return 0;
}

S
Shengliang Guan 已提交
455 456
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
457 458 459
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
460
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
461 462 463 464

  pShow->bytes[cols] = 2;
  pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
  strcpy(pSchema[cols].name, "id");
S
Shengliang Guan 已提交
465
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
466 467 468 469 470
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "endpoint");
S
Shengliang Guan 已提交
471
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
472 473 474 475 476
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
477
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
478 479
  cols++;

S
Shengliang Guan 已提交
480
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
481 482 483 484 485 486 487 488 489
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
490
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
491 492 493 494

  return 0;
}

S
Shengliang Guan 已提交
495 496
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode    *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SSnodeObj *pObj = NULL;
  char      *pWrite;

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_SNODE, pShow->pIter, (void **)&pObj);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int16_t *)pWrite = pObj->id;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);

    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pObj->createdTime;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pObj);
  }

  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  pShow->numOfReads += numOfRows;

  return numOfRows;
}

static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}