mndSnode.c 16.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndSnode.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19 20 21
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
#define TSDB_SNODE_VER_NUMBER   1
S
Shengliang Guan 已提交
25 26 27 28 29 30
#define TSDB_SNODE_RESERVE_SIZE 64

static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw);
static int32_t  mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static int32_t  mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
S
Shengliang Guan 已提交
31
static int32_t  mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
S
Shengliang Guan 已提交
32 33 34 35 36 37
static int32_t  mndProcessCreateSnodeReq(SNodeMsg *pReq);
static int32_t  mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t  mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
static int32_t  mndProcessDropSnodeRsp(SNodeMsg *pRsp);
static int32_t  mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t  mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
static void     mndCancelGetNextSnode(SMnode *pMnode, void *pIter);

int32_t mndInitSnode(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_SNODE,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
                     .decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
                     .insertFp = (SdbInsertFp)mndSnodeActionInsert,
                     .updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndSnodeActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq);
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndGetSnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupSnode(SMnode *pMnode) {}

static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) {
64 65
  SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId);
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
66 67 68 69 70 71 72 73 74 75 76
    terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
  }
  return pObj;
}

static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pObj);
}

static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) {
77 78
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
79
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE);
80
  if (pRaw == NULL) goto SNODE_ENCODE_OVER;
S
Shengliang Guan 已提交
81 82

  int32_t dataPos = 0;
83 84 85 86 87 88
  SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER)
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
89

90 91 92 93 94 95 96 97
SNODE_ENCODE_OVER:
  if (terrno != 0) {
    mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("snode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
98 99 100 101
  return pRaw;
}

static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
102 103
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
104
  int8_t sver = 0;
105
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
106 107 108

  if (sver != TSDB_SNODE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
109
    goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
110 111
  }

112 113 114
  SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
  if (pRow == NULL) goto SNODE_DECODE_OVER;

S
Shengliang Guan 已提交
115
  SSnodeObj *pObj = sdbGetRowObj(pRow);
116
  if (pObj == NULL) goto SNODE_DECODE_OVER;
S
Shengliang Guan 已提交
117 118

  int32_t dataPos = 0;
119 120 121 122 123 124
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER)
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
125

126 127 128 129 130 131 132 133
SNODE_DECODE_OVER:
  if (terrno != 0) {
    mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("snode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
134 135 136 137
  return pRow;
}

static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) {
138
  mTrace("snode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
139 140 141 142 143 144 145 146 147 148 149
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
    mError("snode:%d, failed to perform insert action since %s", pObj->id, terrstr());
    return -1;
  }

  return 0;
}

static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
150
  mTrace("snode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
  }

  return 0;
}

S
Shengliang Guan 已提交
159
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
S
Shengliang Guan 已提交
160
  mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
161
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
162 163 164 165 166 167 168 169 170 171 172
  return 0;
}

static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

173 174 175 176 177 178 179 180
static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
181 182 183 184 185 186 187 188 189
static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
190 191 192 193 194
  SDCreateSnodeReq createReq = {0};
  createReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
195
  if (pReq == NULL) {
S
Shengliang Guan 已提交
196 197 198
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
199
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
200 201 202

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
203
  action.pCont = pReq;
S
Shengliang Guan 已提交
204
  action.contLen = contLen;
S
Shengliang Guan 已提交
205
  action.msgType = TDMT_DND_CREATE_SNODE;
S
shm  
Shengliang Guan 已提交
206
  action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
S
Shengliang Guan 已提交
207 208

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
209
    free(pReq);
S
Shengliang Guan 已提交
210 211 212 213 214 215
    return -1;
  }

  return 0;
}

216
static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
217 218 219 220 221
  SDDropSnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
222
  if (pReq == NULL) {
223 224 225
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
226
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
227 228 229

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
230
  action.pCont = pReq;
S
Shengliang Guan 已提交
231
  action.contLen = contLen;
232
  action.msgType = TDMT_DND_DROP_SNODE;
S
shm  
Shengliang Guan 已提交
233
  action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
234 235

  if (mndTransAppendUndoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
236
    free(pReq);
237 238 239 240 241 242
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
243
static int32_t mndCreateSnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) {
244 245
  int32_t code = -1;

S
Shengliang Guan 已提交
246 247 248 249 250
  SSnodeObj snodeObj = {0};
  snodeObj.id = pDnode->id;
  snodeObj.createdTime = taosGetTimestampMs();
  snodeObj.updateTime = snodeObj.createdTime;

S
Shengliang Guan 已提交
251
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
252
  if (pTrans == NULL) goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
253

254
  mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
S
Shengliang Guan 已提交
255

256 257 258 259 260 261
  if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
262 263 264 265 266 267 268 269

  code = 0;

CREATE_SNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
270 271
static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) {
  SMnode          *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
272 273 274 275 276 277 278 279 280 281
  int32_t          code = -1;
  SSnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SUserObj        *pUser = NULL;
  SMCreateSnodeReq createReq = {0};

  if (tDeserializeSMCreateDropQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_SNODE_OVER;
  }
S
Shengliang Guan 已提交
282

S
Shengliang Guan 已提交
283
  mDebug("snode:%d, start to create", createReq.dnodeId);
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285
  pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
286
  if (pObj != NULL) {
S
Shengliang Guan 已提交
287
    terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
S
Shengliang Guan 已提交
288
    goto CREATE_SNODE_OVER;
289
  } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
S
Shengliang Guan 已提交
290
    goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
291 292
  }

S
Shengliang Guan 已提交
293
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
294 295
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
296
    goto CREATE_SNODE_OVER;
S
Shengliang Guan 已提交
297 298
  }

S
Shengliang Guan 已提交
299 300 301 302 303
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto CREATE_SNODE_OVER;
  }
S
Shengliang Guan 已提交
304

S
Shengliang Guan 已提交
305
  if (mndCheckNodeAuth(pUser)) {
S
Shengliang Guan 已提交
306 307 308 309 310 311 312
    goto CREATE_SNODE_OVER;
  }

  code = mndCreateSnode(pMnode, pReq, pDnode, &createReq);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_SNODE_OVER:
S
Shengliang Guan 已提交
313
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
314
    mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
315 316 317
    return -1;
  }

S
Shengliang Guan 已提交
318 319 320 321 322
  mndReleaseSnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
}

static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
342 343 344 345 346
  SDDropSnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;

  int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
  void   *pReq = malloc(contLen);
S
Shengliang Guan 已提交
347
  if (pReq == NULL) {
S
Shengliang Guan 已提交
348 349 350
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
351
  tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
352 353 354

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
355
  action.pCont = pReq;
S
Shengliang Guan 已提交
356
  action.contLen = contLen;
S
Shengliang Guan 已提交
357
  action.msgType = TDMT_DND_DROP_SNODE;
S
shm  
Shengliang Guan 已提交
358
  action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
359 360

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
361
    free(pReq);
S
Shengliang Guan 已提交
362 363 364 365 366 367
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
368
static int32_t mndDropSnode(SMnode *pMnode, SNodeMsg *pReq, SSnodeObj *pObj) {
S
Shengliang Guan 已提交
369
  int32_t code = -1;
370

S
Shengliang Guan 已提交
371
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
372
  if (pTrans == NULL) goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
373 374 375

  mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);

376 377 378 379
  if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
380 381 382 383 384 385 386 387

  code = 0;

DROP_SNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
388 389
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
  SMnode        *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
390 391 392 393 394 395 396 397 398
  int32_t        code = -1;
  SUserObj      *pUser = NULL;
  SSnodeObj     *pObj = NULL;
  SMDropSnodeReq dropReq = {0};

  if (tDeserializeSMCreateDropQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto DROP_SNODE_OVER;
  }
S
Shengliang Guan 已提交
399

S
Shengliang Guan 已提交
400
  mDebug("snode:%d, start to drop", dropReq.dnodeId);
S
Shengliang Guan 已提交
401

S
Shengliang Guan 已提交
402
  if (dropReq.dnodeId <= 0) {
S
Shengliang Guan 已提交
403
    terrno = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
404
    goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
405 406
  }

S
Shengliang Guan 已提交
407
  pObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
408
  if (pObj == NULL) {
S
Shengliang Guan 已提交
409 410 411 412 413 414 415
    goto DROP_SNODE_OVER;
  }

  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto DROP_SNODE_OVER;
S
Shengliang Guan 已提交
416 417
  }

S
Shengliang Guan 已提交
418
  if (mndCheckNodeAuth(pUser)) {
S
Shengliang Guan 已提交
419 420 421 422 423 424 425 426
    goto DROP_SNODE_OVER;
  }

  code = mndDropSnode(pMnode, pReq, pObj);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

DROP_SNODE_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
427 428 429
    mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
  }

S
Shengliang Guan 已提交
430 431 432 433
  mndReleaseSnode(pMnode, pObj);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
434 435
}

S
Shengliang Guan 已提交
436
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
437
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
438 439 440
  return 0;
}

S
Shengliang Guan 已提交
441
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
442
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
443 444 445
  return 0;
}

S
Shengliang Guan 已提交
446 447
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
448 449 450
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
451
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
452 453 454 455

  pShow->bytes[cols] = 2;
  pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
  strcpy(pSchema[cols].name, "id");
S
Shengliang Guan 已提交
456
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
457 458 459 460 461
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "endpoint");
S
Shengliang Guan 已提交
462
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
463 464 465 466 467
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
468
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
469 470
  cols++;

S
Shengliang Guan 已提交
471
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
472 473 474 475 476 477 478 479 480
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
481
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
482 483 484 485

  return 0;
}

S
Shengliang Guan 已提交
486 487
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode    *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SSnodeObj *pObj = NULL;
  char      *pWrite;

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_SNODE, pShow->pIter, (void **)&pObj);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int16_t *)pWrite = pObj->id;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);

    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pObj->createdTime;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pObj);
  }

  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  pShow->numOfReads += numOfRows;

  return numOfRows;
}

static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}