mndMnode.c 21.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndMnode.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19 20
#include "mndDnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndTrans.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
23
#include "mndSync.h"
S
Shengliang Guan 已提交
24

25 26
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
29
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
30
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31 32
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
33
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
34
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36 37 38 39 40
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndProcessCreateMnodeRsp(SRpcMsg *pRsp);
static int32_t  mndProcessAlterMnodeRsp(SRpcMsg *pRsp);
static int32_t  mndProcessDropMnodeRsp(SRpcMsg *pRsp);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
41
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
42 43

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52 53
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
54

H
Hongze Cheng 已提交
55
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
S
Shengliang Guan 已提交
56
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE, mndProcessAlterMnodeReq);
H
Hongze Cheng 已提交
57 58 59 60
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
S
Shengliang Guan 已提交
61 62 63

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
64 65 66 67 68 69

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

70 71
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
72
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
73 74 75
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
76 77
}

78
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
79
  SSdb *pSdb = pMnode->pSdb;
80
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
81 82
}

S
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

93
  mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
94

95
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  if (pTrans == NULL) {
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
117 118
}

S
Shengliang Guan 已提交
119
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
120 121
  terrno = TSDB_CODE_OUT_OF_MEMORY;

122 123
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
124 125

  int32_t dataPos = 0;
126 127 128 129
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
130 131 132

  terrno = 0;

133
_OVER:
134 135 136 137 138
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
139

140
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
141 142 143 144
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
145 146
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
147 148 149
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

150
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
151
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
152
    goto _OVER;
S
Shengliang Guan 已提交
153 154
  }

155
  SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
156
  if (pRow == NULL) goto _OVER;
157

S
Shengliang Guan 已提交
158
  SMnodeObj *pObj = sdbGetRowObj(pRow);
159
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
160 161

  int32_t dataPos = 0;
162 163 164 165
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
166 167 168

  terrno = 0;

169
_OVER:
170 171
  if (terrno != 0) {
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFreeClear(pRow);
173 174
    return NULL;
  }
S
Shengliang Guan 已提交
175

176
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
177 178 179
  return pRow;
}

S
Shengliang Guan 已提交
180
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
181
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
182 183
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
184
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
185
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
186 187 188
    return -1;
  }

189
  pObj->state = TAOS_SYNC_STATE_ERROR;
S
Shengliang Guan 已提交
190 191 192
  return 0;
}

S
Shengliang Guan 已提交
193
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
194
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
195 196 197
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
198 199 200 201 202
  }

  return 0;
}

S
Shengliang Guan 已提交
203
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
204
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
205
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
206
  return 0;
S
Shengliang Guan 已提交
207 208 209 210 211
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
212 213
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
214 215 216
    return false;
  }

S
Shengliang Guan 已提交
217
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
218
  return true;
S
Shengliang Guan 已提交
219 220
}

S
Shengliang Guan 已提交
221
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
222 223 224
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
  void   *pIter = NULL;
S
Shengliang Guan 已提交
225 226

  while (1) {
S
Shengliang Guan 已提交
227 228
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
229
    if (pIter == NULL) break;
230 231 232

    if (pObj->id == pMnode->selfDnodeId) {
      if (mndIsMaster(pMnode)) {
233
        pEpSet->inUse = pEpSet->numOfEps;
234 235
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
236
      }
S
Shengliang Guan 已提交
237
    }
238 239
    addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
240
  }
S
Shengliang Guan 已提交
241 242
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
267
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
268 269 270 271
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
  SDAlterMnodeReq  alterReq = {0};
S
Shengliang Guan 已提交
272
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
273 274 275
  SEpSet           alterEpset = {0};
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
276 277 278 279
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
280

S
Shengliang Guan 已提交
281 282 283
    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
284

S
Shengliang Guan 已提交
285 286 287 288 289 290 291
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
    }

    numOfReplicas++;
S
Shengliang Guan 已提交
292 293 294
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
295 296 297 298
  alterReq.replica = numOfReplicas + 1;
  alterReq.replicas[numOfReplicas].id = pDnode->id;
  alterReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
299

S
Shengliang Guan 已提交
300 301 302
  alterEpset.numOfEps = numOfReplicas + 1;
  alterEpset.eps[numOfReplicas].port = pDnode->port;
  memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
303

S
Shengliang Guan 已提交
304 305 306 307
  createReq.replica = 1;
  createReq.replicas[0].id = pDnode->id;
  createReq.replicas[0].port = pDnode->port;
  memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
308

S
Shengliang Guan 已提交
309 310 311
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
312

S
Shengliang Guan 已提交
313
  {
314
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
wafwerar's avatar
wafwerar 已提交
315
    void   *pReq = taosMemoryMalloc(contLen);
316
    tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
317

S
Shengliang Guan 已提交
318
    STransAction action = {
319
        .epSet = createEpset,
S
Shengliang Guan 已提交
320 321
        .pCont = pReq,
        .contLen = contLen,
322 323
        .msgType = TDMT_DND_CREATE_MNODE,
        .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
S
Shengliang Guan 已提交
324
    };
S
Shengliang Guan 已提交
325 326

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
327
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
328 329 330 331 332
      return -1;
    }
  }

  {
333
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
wafwerar's avatar
wafwerar 已提交
334
    void   *pReq = taosMemoryMalloc(contLen);
335
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
S
Shengliang Guan 已提交
336

S
Shengliang Guan 已提交
337
    STransAction action = {
338
        .epSet = alterEpset,
S
Shengliang Guan 已提交
339 340
        .pCont = pReq,
        .contLen = contLen,
341 342
        .msgType = TDMT_DND_ALTER_MNODE,
        .acceptableCode = 0,
S
Shengliang Guan 已提交
343 344
    };

S
Shengliang Guan 已提交
345
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
346
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
347 348
      return -1;
    }
349 350 351 352 353
  }

  return 0;
}

S
Shengliang Guan 已提交
354
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
355 356
  int32_t code = -1;

S
Shengliang Guan 已提交
357
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
358
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
359 360 361
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

362
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
363
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
364

S
Shengliang Guan 已提交
365
  mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
366
  mndTransSetSerial(pTrans);
367 368 369
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
S
Shengliang Guan 已提交
370

371
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
372

373 374
  code = 0;

375
_OVER:
S
Shengliang Guan 已提交
376
  mndTransDrop(pTrans);
377
  return code;
S
Shengliang Guan 已提交
378 379
}

S
Shengliang Guan 已提交
380 381
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
382 383 384 385 386 387
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SUserObj        *pUser = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
388
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
389
    terrno = TSDB_CODE_INVALID_MSG;
390
    goto _OVER;
S
Shengliang Guan 已提交
391
  }
S
Shengliang Guan 已提交
392

S
Shengliang Guan 已提交
393
  mDebug("mnode:%d, start to create", createReq.dnodeId);
S
Shengliang Guan 已提交
394

395 396 397 398 399
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) > 3) {
     terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
     goto _OVER;
  }

S
Shengliang Guan 已提交
400
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
401
  if (pObj != NULL) {
S
Shengliang Guan 已提交
402
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
403
    goto _OVER;
S
Shengliang Guan 已提交
404
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
405
    goto _OVER;
S
Shengliang Guan 已提交
406 407
  }

S
Shengliang Guan 已提交
408
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
409 410
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
411
    goto _OVER;
S
Shengliang Guan 已提交
412 413
  }

S
Shengliang Guan 已提交
414
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
415 416
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
417
    goto _OVER;
S
Shengliang Guan 已提交
418
  }
S
Shengliang Guan 已提交
419

S
Shengliang Guan 已提交
420
  if (mndCheckNodeAuth(pUser)) {
421
    goto _OVER;
S
Shengliang Guan 已提交
422 423 424
  }

  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
425
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
426

427
_OVER:
S
Shengliang Guan 已提交
428
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
429
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
430 431
  }

S
Shengliang Guan 已提交
432 433 434 435 436
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
437 438
}

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
455
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
456 457 458
  SSdb           *pSdb = pMnode->pSdb;
  void           *pIter = NULL;
  int32_t         numOfReplicas = 0;
S
Shengliang Guan 已提交
459
  SDAlterMnodeReq alterReq = {0};
S
Shengliang Guan 已提交
460 461 462 463
  SDDropMnodeReq  dropReq = {0};
  SEpSet          alterEpset = {0};
  SEpSet          dropEpSet = {0};

S
Shengliang Guan 已提交
464 465 466 467
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
468 469 470 471 472 473 474 475
    if (pMObj->id == pObj->id) {
      sdbRelease(pSdb, pMObj);
      continue;
    }

    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
476

S
Shengliang Guan 已提交
477 478 479 480
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
S
Shengliang Guan 已提交
481
    }
482

S
Shengliang Guan 已提交
483
    numOfReplicas++;
S
Shengliang Guan 已提交
484 485 486
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
487
  alterReq.replica = numOfReplicas;
S
Shengliang Guan 已提交
488
  alterEpset.numOfEps = numOfReplicas;
S
Shengliang Guan 已提交
489

S
Shengliang Guan 已提交
490 491 492 493
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
494

S
Shengliang Guan 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
  {
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
    void   *pReq = taosMemoryMalloc(contLen);
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);

    STransAction action = {
        .epSet = alterEpset,
        .pCont = pReq,
        .contLen = contLen,
        .msgType = TDMT_DND_ALTER_MNODE,
        .acceptableCode = 0,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
S
Shengliang Guan 已提交
512 513 514
  }

  {
515
    int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq);
wafwerar's avatar
wafwerar 已提交
516
    void   *pReq = taosMemoryMalloc(contLen);
517
    tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
518

S
Shengliang Guan 已提交
519 520 521 522 523 524 525 526
    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
        .msgType = TDMT_DND_DROP_MNODE,
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

S
Shengliang Guan 已提交
527
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
528
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
529 530
      return -1;
    }
531 532 533 534 535
  }

  return 0;
}

S
Shengliang Guan 已提交
536
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
537
  int32_t code = -1;
S
Shengliang Guan 已提交
538

539
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
540
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
541

542
  mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
543
  mndTransSetSerial(pTrans);
544 545 546 547
  if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
548

549 550
  code = 0;

551
_OVER:
S
Shengliang Guan 已提交
552
  mndTransDrop(pTrans);
553
  return code;
S
Shengliang Guan 已提交
554 555
}

S
Shengliang Guan 已提交
556 557
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
558 559 560 561 562
  int32_t        code = -1;
  SUserObj      *pUser = NULL;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
563
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
564
    terrno = TSDB_CODE_INVALID_MSG;
565
    goto _OVER;
S
Shengliang Guan 已提交
566
  }
S
Shengliang Guan 已提交
567

S
Shengliang Guan 已提交
568
  mDebug("mnode:%d, start to drop", dropReq.dnodeId);
S
Shengliang Guan 已提交
569

S
Shengliang Guan 已提交
570
  if (dropReq.dnodeId <= 0) {
571 572
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
573 574
  }

S
Shengliang Guan 已提交
575
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
576
  if (pObj == NULL) {
577 578 579
    goto _OVER;
  }

580
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
581 582 583 584 585 586 587
    terrno = TSDB_CODE_MND_CANT_DROP_MASTER;
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
588 589
  }

S
Shengliang Guan 已提交
590
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
591 592
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
593
    goto _OVER;
S
Shengliang Guan 已提交
594 595
  }

S
Shengliang Guan 已提交
596
  if (mndCheckNodeAuth(pUser)) {
597
    goto _OVER;
S
Shengliang Guan 已提交
598
  }
S
Shengliang Guan 已提交
599

S
Shengliang Guan 已提交
600
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
601
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
602

603
_OVER:
S
Shengliang Guan 已提交
604
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
605 606 607 608 609 610 611
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
612 613
}

S
Shengliang Guan 已提交
614
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
615
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
616 617 618
  return 0;
}

S
Shengliang Guan 已提交
619
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
620
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
621 622
  return 0;
}
S
Shengliang Guan 已提交
623

S
Shengliang Guan 已提交
624
static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
625
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
626 627
  return 0;
}
S
Shengliang Guan 已提交
628

S
Shengliang Guan 已提交
629 630
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
631 632 633
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
634
  SMnodeObj *pObj = NULL;
S
Shengliang Guan 已提交
635 636 637
  char      *pWrite;

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
638
    pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj);
S
Shengliang Guan 已提交
639 640 641
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
642 643
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
644

645
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
646
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
647

648 649
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
650

651 652
    const char *roles = NULL;
    if (pObj->id == pMnode->selfDnodeId) {
653
      roles = syncStr(TAOS_SYNC_STATE_LEADER);
654 655
    } else {
      roles = syncStr(pObj->state);
656 657
    }
    char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
658
    STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
659 660

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
661
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
662

663 664
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
665 666

    numOfRows++;
S
Shengliang Guan 已提交
667
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
668 669
  }

670
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
671 672 673 674 675 676 677 678

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
679 680

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
681
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i;
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
    mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex);
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

S
Shengliang Guan 已提交
708 709
  mTrace("trans:-1, sync reconfig will be proposed");

S
Shengliang Guan 已提交
710 711 712 713
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->standby = 0;
  int32_t code = syncReconfig(pMgmt->sync, &cfg);
  if (code != 0) {
S
Shengliang Guan 已提交
714
    mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
715 716 717
    return code;
  } else {
    pMgmt->errCode = 0;
S
Shengliang Guan 已提交
718
    pMgmt->transId = -1;
S
Shengliang Guan 已提交
719 720 721 722 723 724
    tsem_wait(&pMgmt->syncSem);
    mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
    terrno = pMgmt->errCode;
    return pMgmt->errCode;
  }
}