mndMnode.c 23.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndMnode.h"
18
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
19 20
#include "mndDnode.h"
#include "mndShow.h"
21
#include "mndSync.h"
S
Shengliang Guan 已提交
22
#include "mndTrans.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
S
Shengliang Guan 已提交
24

25 26
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
29
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
30
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31 32
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
33
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
34
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36 37
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
38
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
39 40

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
51

H
Hongze Cheng 已提交
52
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
53
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
54
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
55
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
H
Hongze Cheng 已提交
56
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
57
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
58 59
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
60 61 62

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
63 64 65 66 67 68

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

69 70
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
71
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
72 73 74
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
75 76
}

77
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
78
  SSdb *pSdb = pMnode->pSdb;
79
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
80 81
}

S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
S
Shengliang Guan 已提交
90
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
91

92
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
93

94
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
95
  if (pTrans == NULL) {
S
Shengliang Guan 已提交
96
    sdbFreeRaw(pRaw);
97 98 99
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
100
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
101 102 103 104 105 106

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
S
Shengliang Guan 已提交
107
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
108 109 110 111 112 113 114 115 116

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
117 118
}

S
Shengliang Guan 已提交
119
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
120 121
  terrno = TSDB_CODE_OUT_OF_MEMORY;

122 123
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
124 125

  int32_t dataPos = 0;
126 127 128 129
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
130 131 132

  terrno = 0;

133
_OVER:
134 135 136 137 138
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
139

140
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
141 142 143 144
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
145 146
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
147 148 149
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

150
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
151
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
152
    goto _OVER;
S
Shengliang Guan 已提交
153 154
  }

155
  SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
156
  if (pRow == NULL) goto _OVER;
157

S
Shengliang Guan 已提交
158
  SMnodeObj *pObj = sdbGetRowObj(pRow);
159
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
160 161

  int32_t dataPos = 0;
162 163 164 165
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
166 167 168

  terrno = 0;

169
_OVER:
170 171
  if (terrno != 0) {
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFreeClear(pRow);
173 174
    return NULL;
  }
S
Shengliang Guan 已提交
175

176
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
177 178 179
  return pRow;
}

S
Shengliang Guan 已提交
180
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
181
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
182 183
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
184
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
185
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
186 187 188
    return -1;
  }

189
  pObj->state = TAOS_SYNC_STATE_ERROR;
S
Shengliang Guan 已提交
190 191 192
  return 0;
}

S
Shengliang Guan 已提交
193
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
194
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
195 196 197
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
198 199 200 201 202
  }

  return 0;
}

S
Shengliang Guan 已提交
203
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
204
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
205
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
206
  return 0;
S
Shengliang Guan 已提交
207 208 209 210 211
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
212 213
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
214 215 216
    return false;
  }

S
Shengliang Guan 已提交
217
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
218
  return true;
S
Shengliang Guan 已提交
219 220
}

S
Shengliang Guan 已提交
221
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
222 223
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
S
Shengliang Guan 已提交
224 225 226 227
  if (totalMnodes == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
    return;
  }
S
Shengliang Guan 已提交
228

S
Shengliang Guan 已提交
229
  void *pIter = NULL;
S
Shengliang Guan 已提交
230
  while (1) {
S
Shengliang Guan 已提交
231 232
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
233
    if (pIter == NULL) break;
234 235 236

    if (pObj->id == pMnode->selfDnodeId) {
      if (mndIsMaster(pMnode)) {
237
        pEpSet->inUse = pEpSet->numOfEps;
238 239
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
240
      }
S
Shengliang Guan 已提交
241
    }
S
Shengliang Guan 已提交
242 243 244
    if (pObj->pDnode != NULL) {
      addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    }
245
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
246
  }
247 248 249 250

  if (pEpSet->numOfEps == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
  }
S
Shengliang Guan 已提交
251 252
}

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
277
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
278 279 280 281
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
  SDAlterMnodeReq  alterReq = {0};
S
Shengliang Guan 已提交
282
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
283 284 285
  SEpSet           alterEpset = {0};
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
286 287 288 289
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
290

S
Shengliang Guan 已提交
291 292 293
    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
294

S
Shengliang Guan 已提交
295 296 297 298 299 300 301
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
    }

    numOfReplicas++;
S
Shengliang Guan 已提交
302 303 304
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
305 306 307 308
  alterReq.replica = numOfReplicas + 1;
  alterReq.replicas[numOfReplicas].id = pDnode->id;
  alterReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
309

S
Shengliang Guan 已提交
310 311 312
  alterEpset.numOfEps = numOfReplicas + 1;
  alterEpset.eps[numOfReplicas].port = pDnode->port;
  memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
313

S
Shengliang Guan 已提交
314 315 316 317
  createReq.replica = 1;
  createReq.replicas[0].id = pDnode->id;
  createReq.replicas[0].port = pDnode->port;
  memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
318

S
Shengliang Guan 已提交
319 320 321
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
322

S
Shengliang Guan 已提交
323
  {
324
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
wafwerar's avatar
wafwerar 已提交
325
    void   *pReq = taosMemoryMalloc(contLen);
326
    tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
327

S
Shengliang Guan 已提交
328
    STransAction action = {
329
        .epSet = createEpset,
S
Shengliang Guan 已提交
330 331
        .pCont = pReq,
        .contLen = contLen,
332 333
        .msgType = TDMT_DND_CREATE_MNODE,
        .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
S
Shengliang Guan 已提交
334
    };
S
Shengliang Guan 已提交
335 336

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
337
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
338 339 340 341 342
      return -1;
    }
  }

  {
343
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
wafwerar's avatar
wafwerar 已提交
344
    void   *pReq = taosMemoryMalloc(contLen);
345
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
S
Shengliang Guan 已提交
346

S
Shengliang Guan 已提交
347
    STransAction action = {
348
        .epSet = alterEpset,
S
Shengliang Guan 已提交
349 350
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
351
        .msgType = TDMT_MND_ALTER_MNODE,
352
        .acceptableCode = 0,
S
Shengliang Guan 已提交
353 354
    };

S
Shengliang Guan 已提交
355
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
356
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
357 358
      return -1;
    }
359 360 361 362 363
  }

  return 0;
}

S
Shengliang Guan 已提交
364
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
365 366
  int32_t code = -1;

S
Shengliang Guan 已提交
367
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
368
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
369 370 371
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

372
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
373
  if (pTrans == NULL) goto _OVER;
374
  mndTransSetSerial(pTrans);
375
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
S
Shengliang Guan 已提交
376

377 378 379
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
380
  if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
381
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
382

383 384
  code = 0;

385
_OVER:
S
Shengliang Guan 已提交
386
  mndTransDrop(pTrans);
387
  return code;
S
Shengliang Guan 已提交
388 389
}

S
Shengliang Guan 已提交
390 391
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
392 393 394 395 396
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
397
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
398
    terrno = TSDB_CODE_INVALID_MSG;
399
    goto _OVER;
S
Shengliang Guan 已提交
400
  }
S
Shengliang Guan 已提交
401

402
  mInfo("mnode:%d, start to create", createReq.dnodeId);
403
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
S
Shengliang Guan 已提交
404 405
    goto _OVER;
  }
S
Shengliang Guan 已提交
406

S
Shengliang Guan 已提交
407
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
408
  if (pObj != NULL) {
S
Shengliang Guan 已提交
409
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
410
    goto _OVER;
S
Shengliang Guan 已提交
411
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
412
    goto _OVER;
S
Shengliang Guan 已提交
413 414
  }

S
Shengliang Guan 已提交
415
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
416 417
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
418
    goto _OVER;
S
Shengliang Guan 已提交
419 420
  }

421 422 423 424 425
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
    terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
    goto _OVER;
  }

S
Shengliang Guan 已提交
426
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
427 428 429 430
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
431
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
432
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
433

434
_OVER:
S
Shengliang Guan 已提交
435
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
436
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
437 438
  }

S
Shengliang Guan 已提交
439 440 441 442
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);

  return code;
S
Shengliang Guan 已提交
443 444
}

445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
461
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
462 463 464
  SSdb           *pSdb = pMnode->pSdb;
  void           *pIter = NULL;
  int32_t         numOfReplicas = 0;
S
Shengliang Guan 已提交
465
  SDAlterMnodeReq alterReq = {0};
S
Shengliang Guan 已提交
466
  SDDropMnodeReq  dropReq = {0};
467
  SSetStandbyReq  standbyReq = {0};
S
Shengliang Guan 已提交
468 469 470
  SEpSet          alterEpset = {0};
  SEpSet          dropEpSet = {0};

S
Shengliang Guan 已提交
471 472 473 474
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
475 476 477 478 479 480 481 482
    if (pMObj->id == pObj->id) {
      sdbRelease(pSdb, pMObj);
      continue;
    }

    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
483

S
Shengliang Guan 已提交
484 485 486 487
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
S
Shengliang Guan 已提交
488
    }
489

S
Shengliang Guan 已提交
490
    numOfReplicas++;
S
Shengliang Guan 已提交
491 492 493
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
494
  alterReq.replica = numOfReplicas;
S
Shengliang Guan 已提交
495
  alterEpset.numOfEps = numOfReplicas;
S
Shengliang Guan 已提交
496

S
Shengliang Guan 已提交
497 498 499 500
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
501

502 503 504 505 506 507
  standbyReq.dnodeId = pDnode->id;
  standbyReq.standby = 1;

  {
    int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq) + sizeof(SMsgHead);
    void   *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
508
    tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq);
509 510 511 512 513 514 515 516
    SMsgHead *pHead = pReq;
    pHead->contLen = htonl(contLen);
    pHead->vgId = htonl(MNODE_HANDLE);

    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
517
        .msgType = TDMT_SYNC_SET_MNODE_STANDBY,
518 519 520 521 522 523 524 525 526
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
  }

S
Shengliang Guan 已提交
527 528 529 530 531 532 533 534 535
  {
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
    void   *pReq = taosMemoryMalloc(contLen);
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);

    STransAction action = {
        .epSet = alterEpset,
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
536
        .msgType = TDMT_MND_ALTER_MNODE,
S
Shengliang Guan 已提交
537 538 539 540 541 542 543
        .acceptableCode = 0,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
S
Shengliang Guan 已提交
544 545 546
  }

  {
547
    int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq);
wafwerar's avatar
wafwerar 已提交
548
    void   *pReq = taosMemoryMalloc(contLen);
549
    tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
550

S
Shengliang Guan 已提交
551 552 553 554 555 556 557 558
    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
        .msgType = TDMT_DND_DROP_MNODE,
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

S
Shengliang Guan 已提交
559
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
560
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
561 562
      return -1;
    }
563 564 565 566 567
  }

  return 0;
}

S
Shengliang Guan 已提交
568
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
569
  if (pObj == NULL) return 0;
S
Shengliang Guan 已提交
570 571 572 573 574 575 576
  if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
  if (mndTransAppendNullLog(pTrans) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
577
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
578
  int32_t code = -1;
S
Shengliang Guan 已提交
579
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
580

581
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
582
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
583
  mndTransSetSerial(pTrans);
584
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
S
Shengliang Guan 已提交
585

S
Shengliang Guan 已提交
586
  if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
587
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
588

589 590
  code = 0;

591
_OVER:
S
Shengliang Guan 已提交
592
  mndTransDrop(pTrans);
593
  return code;
S
Shengliang Guan 已提交
594 595
}

S
Shengliang Guan 已提交
596 597
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
598 599 600 601
  int32_t        code = -1;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
602
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
603
    terrno = TSDB_CODE_INVALID_MSG;
604
    goto _OVER;
S
Shengliang Guan 已提交
605
  }
S
Shengliang Guan 已提交
606

607
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
608
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
S
Shengliang Guan 已提交
609 610
    goto _OVER;
  }
S
Shengliang Guan 已提交
611

S
Shengliang Guan 已提交
612
  if (dropReq.dnodeId <= 0) {
613 614
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
615 616
  }

S
Shengliang Guan 已提交
617
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
618
  if (pObj == NULL) {
619 620 621
    goto _OVER;
  }

622
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
S
Shengliang Guan 已提交
623
    terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
624 625 626 627 628 629
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
630 631
  }

632 633 634 635 636
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
637
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
638
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
639

640
_OVER:
S
Shengliang Guan 已提交
641
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
642 643 644 645 646
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  return code;
S
Shengliang Guan 已提交
647 648
}

S
Shengliang Guan 已提交
649 650
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
651 652 653
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
654
  SMnodeObj *pObj = NULL;
655
  ESdbStatus objStatus = 0;
S
Shengliang Guan 已提交
656
  char      *pWrite;
657
  int64_t    curMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
658 659

  while (numOfRows < rows) {
660
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus);
S
Shengliang Guan 已提交
661 662 663
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
664 665
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
666

667
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
S
Shengliang Guan 已提交
668
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
S
Shengliang Guan 已提交
669

670 671
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
672

673
    const char *roles = "offline";
674
    if (pObj->id == pMnode->selfDnodeId) {
675 676
      roles = syncStr(TAOS_SYNC_STATE_LEADER);
    }
S
Shengliang Guan 已提交
677
    if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
678
      roles = syncStr(pObj->state);
S
Shengliang Guan 已提交
679 680 681 682
      if (pObj->state == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
        roles = syncStr(TAOS_SYNC_STATE_ERROR);
        mError("mnode:%d, is leader too", pObj->id);
      }
683 684
    }
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
685
    STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
686
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
687
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
688

689 690 691
    const char *status = "ready";
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
692 693 694 695 696
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)b3, false);

697 698
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
699 700

    numOfRows++;
S
Shengliang Guan 已提交
701
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
702 703
  }

704
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
705 706 707 708 709 710 711 712

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
713 714

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
715
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i;
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
    mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex);
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

742
  mInfo("trans:-1, sync reconfig will be proposed");
S
Shengliang Guan 已提交
743

S
Shengliang Guan 已提交
744 745 746 747
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->standby = 0;
  int32_t code = syncReconfig(pMgmt->sync, &cfg);
  if (code != 0) {
S
Shengliang Guan 已提交
748
    mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
749 750 751
    return code;
  } else {
    pMgmt->errCode = 0;
752
    taosWLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
753
    pMgmt->transId = -1;
754
    taosWUnLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
755
    tsem_wait(&pMgmt->syncSem);
S
Shengliang Guan 已提交
756
    mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
S
Shengliang Guan 已提交
757 758 759 760
    terrno = pMgmt->errCode;
    return pMgmt->errCode;
  }
}