mndMnode.c 23.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndMnode.h"
18
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
19 20
#include "mndDnode.h"
#include "mndShow.h"
21
#include "mndSync.h"
S
Shengliang Guan 已提交
22
#include "mndTrans.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
S
Shengliang Guan 已提交
24

25 26
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
29
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
30
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31 32
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
33
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
34
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36 37
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
38
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
39 40

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
51

H
Hongze Cheng 已提交
52
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
53
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
54
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
55
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
H
Hongze Cheng 已提交
56
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
57
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
58 59
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
60 61 62

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
63 64 65 66 67 68

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

69 70
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
71
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
72 73 74
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
75 76
}

77
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
78
  SSdb *pSdb = pMnode->pSdb;
79
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
80 81
}

S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

92
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
93

94
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
95 96 97 98
  if (pTrans == NULL) {
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
99
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
116 117
}

S
Shengliang Guan 已提交
118
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
119 120
  terrno = TSDB_CODE_OUT_OF_MEMORY;

121 122
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
123 124

  int32_t dataPos = 0;
125 126 127 128
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
129 130 131

  terrno = 0;

132
_OVER:
133 134 135 136 137
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
138

139
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
140 141 142 143
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
144 145
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
146 147 148
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

149
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
150
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
151
    goto _OVER;
S
Shengliang Guan 已提交
152 153
  }

154
  SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
155
  if (pRow == NULL) goto _OVER;
156

S
Shengliang Guan 已提交
157
  SMnodeObj *pObj = sdbGetRowObj(pRow);
158
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
159 160

  int32_t dataPos = 0;
161 162 163 164
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
165 166 167

  terrno = 0;

168
_OVER:
169 170
  if (terrno != 0) {
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
171
    taosMemoryFreeClear(pRow);
172 173
    return NULL;
  }
S
Shengliang Guan 已提交
174

175
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
176 177 178
  return pRow;
}

S
Shengliang Guan 已提交
179
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
180
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
181 182
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
183
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
184
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
185 186 187
    return -1;
  }

188
  pObj->state = TAOS_SYNC_STATE_ERROR;
S
Shengliang Guan 已提交
189 190 191
  return 0;
}

S
Shengliang Guan 已提交
192
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
193
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
194 195 196
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
197 198 199 200 201
  }

  return 0;
}

S
Shengliang Guan 已提交
202
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
203
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
204
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
205
  return 0;
S
Shengliang Guan 已提交
206 207 208 209 210
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
211 212
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
213 214 215
    return false;
  }

S
Shengliang Guan 已提交
216
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
217
  return true;
S
Shengliang Guan 已提交
218 219
}

S
Shengliang Guan 已提交
220
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
221 222 223
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
  void   *pIter = NULL;
S
Shengliang Guan 已提交
224 225

  while (1) {
S
Shengliang Guan 已提交
226 227
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
228
    if (pIter == NULL) break;
229 230 231

    if (pObj->id == pMnode->selfDnodeId) {
      if (mndIsMaster(pMnode)) {
232
        pEpSet->inUse = pEpSet->numOfEps;
233 234
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
235
      }
S
Shengliang Guan 已提交
236
    }
237 238
    addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
239
  }
240 241 242 243

  if (pEpSet->numOfEps == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
  }
S
Shengliang Guan 已提交
244 245
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
270
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
271 272 273 274
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
  SDAlterMnodeReq  alterReq = {0};
S
Shengliang Guan 已提交
275
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
276 277 278
  SEpSet           alterEpset = {0};
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
279 280 281 282
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
283

S
Shengliang Guan 已提交
284 285 286
    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
287

S
Shengliang Guan 已提交
288 289 290 291 292 293 294
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
    }

    numOfReplicas++;
S
Shengliang Guan 已提交
295 296 297
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
298 299 300 301
  alterReq.replica = numOfReplicas + 1;
  alterReq.replicas[numOfReplicas].id = pDnode->id;
  alterReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
302

S
Shengliang Guan 已提交
303 304 305
  alterEpset.numOfEps = numOfReplicas + 1;
  alterEpset.eps[numOfReplicas].port = pDnode->port;
  memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
306

S
Shengliang Guan 已提交
307 308 309 310
  createReq.replica = 1;
  createReq.replicas[0].id = pDnode->id;
  createReq.replicas[0].port = pDnode->port;
  memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312 313 314
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
315

S
Shengliang Guan 已提交
316
  {
317
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
wafwerar's avatar
wafwerar 已提交
318
    void   *pReq = taosMemoryMalloc(contLen);
319
    tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
320

S
Shengliang Guan 已提交
321
    STransAction action = {
322
        .epSet = createEpset,
S
Shengliang Guan 已提交
323 324
        .pCont = pReq,
        .contLen = contLen,
325 326
        .msgType = TDMT_DND_CREATE_MNODE,
        .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
S
Shengliang Guan 已提交
327
    };
S
Shengliang Guan 已提交
328 329

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
330
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
331 332 333 334 335
      return -1;
    }
  }

  {
336
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
wafwerar's avatar
wafwerar 已提交
337
    void   *pReq = taosMemoryMalloc(contLen);
338
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
S
Shengliang Guan 已提交
339

S
Shengliang Guan 已提交
340
    STransAction action = {
341
        .epSet = alterEpset,
S
Shengliang Guan 已提交
342 343
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
344
        .msgType = TDMT_MND_ALTER_MNODE,
345
        .acceptableCode = 0,
S
Shengliang Guan 已提交
346 347
    };

S
Shengliang Guan 已提交
348
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
349
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
350 351
      return -1;
    }
352 353 354 355 356
  }

  return 0;
}

S
Shengliang Guan 已提交
357
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
358 359
  int32_t code = -1;

S
Shengliang Guan 已提交
360
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
361
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
362 363 364
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

365
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
366
  if (pTrans == NULL) goto _OVER;
367
  mndTransSetSerial(pTrans);
368
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
S
Shengliang Guan 已提交
369

370 371 372
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
373
  if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
374
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
375

376 377
  code = 0;

378
_OVER:
S
Shengliang Guan 已提交
379
  mndTransDrop(pTrans);
380
  return code;
S
Shengliang Guan 已提交
381 382
}

S
Shengliang Guan 已提交
383 384
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
385 386 387 388 389
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
390
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
391
    terrno = TSDB_CODE_INVALID_MSG;
392
    goto _OVER;
S
Shengliang Guan 已提交
393
  }
S
Shengliang Guan 已提交
394

395
  mInfo("mnode:%d, start to create", createReq.dnodeId);
396
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
S
Shengliang Guan 已提交
397 398
    goto _OVER;
  }
S
Shengliang Guan 已提交
399

S
Shengliang Guan 已提交
400
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
401
  if (pObj != NULL) {
S
Shengliang Guan 已提交
402
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
403
    goto _OVER;
S
Shengliang Guan 已提交
404
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
405
    goto _OVER;
S
Shengliang Guan 已提交
406 407
  }

S
Shengliang Guan 已提交
408
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
409 410
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
411
    goto _OVER;
S
Shengliang Guan 已提交
412 413
  }

414 415 416 417 418
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
    terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
    goto _OVER;
  }

S
Shengliang Guan 已提交
419
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
420 421 422 423
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
424
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
425
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
426

427
_OVER:
S
Shengliang Guan 已提交
428
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
429
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
430 431
  }

S
Shengliang Guan 已提交
432 433 434 435
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);

  return code;
S
Shengliang Guan 已提交
436 437
}

438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
454
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
455 456 457
  SSdb           *pSdb = pMnode->pSdb;
  void           *pIter = NULL;
  int32_t         numOfReplicas = 0;
S
Shengliang Guan 已提交
458
  SDAlterMnodeReq alterReq = {0};
S
Shengliang Guan 已提交
459
  SDDropMnodeReq  dropReq = {0};
460
  SSetStandbyReq  standbyReq = {0};
S
Shengliang Guan 已提交
461 462 463
  SEpSet          alterEpset = {0};
  SEpSet          dropEpSet = {0};

S
Shengliang Guan 已提交
464 465 466 467
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
468 469 470 471 472 473 474 475
    if (pMObj->id == pObj->id) {
      sdbRelease(pSdb, pMObj);
      continue;
    }

    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
476

S
Shengliang Guan 已提交
477 478 479 480
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
S
Shengliang Guan 已提交
481
    }
482

S
Shengliang Guan 已提交
483
    numOfReplicas++;
S
Shengliang Guan 已提交
484 485 486
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
487
  alterReq.replica = numOfReplicas;
S
Shengliang Guan 已提交
488
  alterEpset.numOfEps = numOfReplicas;
S
Shengliang Guan 已提交
489

S
Shengliang Guan 已提交
490 491 492 493
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
494

495 496 497 498 499 500
  standbyReq.dnodeId = pDnode->id;
  standbyReq.standby = 1;

  {
    int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq) + sizeof(SMsgHead);
    void   *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
501
    tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq);
502 503 504 505 506 507 508 509
    SMsgHead *pHead = pReq;
    pHead->contLen = htonl(contLen);
    pHead->vgId = htonl(MNODE_HANDLE);

    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
510
        .msgType = TDMT_SYNC_SET_MNODE_STANDBY,
511 512 513 514 515 516 517 518 519
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
  }

S
Shengliang Guan 已提交
520 521 522 523 524 525 526 527 528
  {
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
    void   *pReq = taosMemoryMalloc(contLen);
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);

    STransAction action = {
        .epSet = alterEpset,
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
529
        .msgType = TDMT_MND_ALTER_MNODE,
S
Shengliang Guan 已提交
530 531 532 533 534 535 536
        .acceptableCode = 0,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
S
Shengliang Guan 已提交
537 538 539
  }

  {
540
    int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq);
wafwerar's avatar
wafwerar 已提交
541
    void   *pReq = taosMemoryMalloc(contLen);
542
    tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
543

S
Shengliang Guan 已提交
544 545 546 547 548 549 550 551
    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
        .msgType = TDMT_DND_DROP_MNODE,
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

S
Shengliang Guan 已提交
552
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
553
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
554 555
      return -1;
    }
556 557 558 559 560
  }

  return 0;
}

S
Shengliang Guan 已提交
561
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
562
  if (pObj == NULL) return 0;
S
Shengliang Guan 已提交
563 564 565 566 567 568 569
  if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
  if (mndTransAppendNullLog(pTrans) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
570
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
571
  int32_t code = -1;
S
Shengliang Guan 已提交
572
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
573

S
Shengliang Guan 已提交
574
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
575
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
576
  mndTransSetSerial(pTrans);
577
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
S
Shengliang Guan 已提交
578

S
Shengliang Guan 已提交
579
  if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
580
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
581

582 583
  code = 0;

584
_OVER:
S
Shengliang Guan 已提交
585
  mndTransDrop(pTrans);
586
  return code;
S
Shengliang Guan 已提交
587 588
}

S
Shengliang Guan 已提交
589 590
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
591 592 593 594
  int32_t        code = -1;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
595
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
596
    terrno = TSDB_CODE_INVALID_MSG;
597
    goto _OVER;
S
Shengliang Guan 已提交
598
  }
S
Shengliang Guan 已提交
599

600
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
601
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
S
Shengliang Guan 已提交
602 603
    goto _OVER;
  }
S
Shengliang Guan 已提交
604

S
Shengliang Guan 已提交
605
  if (dropReq.dnodeId <= 0) {
606 607
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
608 609
  }

S
Shengliang Guan 已提交
610
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
611
  if (pObj == NULL) {
612 613 614
    goto _OVER;
  }

615
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
S
Shengliang Guan 已提交
616
    terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
617 618 619 620 621 622
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
623 624
  }

625 626 627 628 629
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
630
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
631
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
632

633
_OVER:
S
Shengliang Guan 已提交
634
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
635 636 637 638 639
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  return code;
S
Shengliang Guan 已提交
640 641
}

S
Shengliang Guan 已提交
642 643
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
644 645 646
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
647
  SMnodeObj *pObj = NULL;
648
  ESdbStatus objStatus = 0;
S
Shengliang Guan 已提交
649
  char      *pWrite;
650
  int64_t    curMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
651 652

  while (numOfRows < rows) {
653
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus);
S
Shengliang Guan 已提交
654 655 656
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
657 658
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
659

660
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
661
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
662

663 664
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
665

666
    const char *roles = "offline";
667
    if (pObj->id == pMnode->selfDnodeId) {
668 669
      roles = syncStr(TAOS_SYNC_STATE_LEADER);
    }
S
Shengliang Guan 已提交
670
    if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
671
      roles = syncStr(pObj->state);
S
Shengliang Guan 已提交
672 673 674 675
      if (pObj->state == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
        roles = syncStr(TAOS_SYNC_STATE_ERROR);
        mError("mnode:%d, is leader too", pObj->id);
      }
676 677
    }
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
678
    STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
679
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
680
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
681

682 683 684
    const char *status = "ready";
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
685 686 687 688 689
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)b3, false);

690 691
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
692 693

    numOfRows++;
S
Shengliang Guan 已提交
694
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
695 696
  }

697
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
698 699 700 701 702 703 704 705

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
706 707

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
708
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i;
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
    mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex);
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

735
  mInfo("trans:-1, sync reconfig will be proposed");
S
Shengliang Guan 已提交
736

S
Shengliang Guan 已提交
737 738 739 740
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->standby = 0;
  int32_t code = syncReconfig(pMgmt->sync, &cfg);
  if (code != 0) {
S
Shengliang Guan 已提交
741
    mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
742 743 744
    return code;
  } else {
    pMgmt->errCode = 0;
745
    taosWLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
746
    pMgmt->transId = -1;
747
    taosWUnLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
748
    tsem_wait(&pMgmt->syncSem);
S
Shengliang Guan 已提交
749
    mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
S
Shengliang Guan 已提交
750 751 752 753
    terrno = pMgmt->errCode;
    return pMgmt->errCode;
  }
}