mndMnode.c 22.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndMnode.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19 20
#include "mndDnode.h"
#include "mndShow.h"
21
#include "mndSync.h"
S
Shengliang Guan 已提交
22
#include "mndTrans.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
S
Shengliang Guan 已提交
24

25 26
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
29
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
30
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31 32
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
33
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
34
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36 37
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
38
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
39 40

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
51

H
Hongze Cheng 已提交
52
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
53
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
54
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
55
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
H
Hongze Cheng 已提交
56
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
57
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
58 59 60

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
61 62 63 64 65 66

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

67 68
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
69
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
70 71 72
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
73 74
}

75
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
76
  SSdb *pSdb = pMnode->pSdb;
77
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

90
  mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
91

92
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  if (pTrans == NULL) {
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
114 115
}

S
Shengliang Guan 已提交
116
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
117 118
  terrno = TSDB_CODE_OUT_OF_MEMORY;

119 120
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
121 122

  int32_t dataPos = 0;
123 124 125 126
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
127 128 129

  terrno = 0;

130
_OVER:
131 132 133 134 135
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
136

137
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
138 139 140 141
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
142 143
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
144 145 146
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

147
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
148
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
149
    goto _OVER;
S
Shengliang Guan 已提交
150 151
  }

152
  SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
153
  if (pRow == NULL) goto _OVER;
154

S
Shengliang Guan 已提交
155
  SMnodeObj *pObj = sdbGetRowObj(pRow);
156
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
157 158

  int32_t dataPos = 0;
159 160 161 162
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
163 164 165

  terrno = 0;

166
_OVER:
167 168
  if (terrno != 0) {
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
169
    taosMemoryFreeClear(pRow);
170 171
    return NULL;
  }
S
Shengliang Guan 已提交
172

173
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
174 175 176
  return pRow;
}

S
Shengliang Guan 已提交
177
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
178
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
179 180
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
181
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
182
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
183 184 185
    return -1;
  }

186
  pObj->state = TAOS_SYNC_STATE_ERROR;
S
Shengliang Guan 已提交
187 188 189
  return 0;
}

S
Shengliang Guan 已提交
190
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
191
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
192 193 194
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
195 196 197 198 199
  }

  return 0;
}

S
Shengliang Guan 已提交
200
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
201
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
202
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
203
  return 0;
S
Shengliang Guan 已提交
204 205 206 207 208
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
209 210
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
211 212 213
    return false;
  }

S
Shengliang Guan 已提交
214
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
215
  return true;
S
Shengliang Guan 已提交
216 217
}

S
Shengliang Guan 已提交
218
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
219 220 221
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
  void   *pIter = NULL;
S
Shengliang Guan 已提交
222 223

  while (1) {
S
Shengliang Guan 已提交
224 225
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
226
    if (pIter == NULL) break;
227 228 229

    if (pObj->id == pMnode->selfDnodeId) {
      if (mndIsMaster(pMnode)) {
230
        pEpSet->inUse = pEpSet->numOfEps;
231 232
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
233
      }
S
Shengliang Guan 已提交
234
    }
235 236
    addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
237
  }
S
Shengliang Guan 已提交
238 239
}

240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
264
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
265 266 267 268
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
  SDAlterMnodeReq  alterReq = {0};
S
Shengliang Guan 已提交
269
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
270 271 272
  SEpSet           alterEpset = {0};
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
273 274 275 276
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
277

S
Shengliang Guan 已提交
278 279 280
    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
281

S
Shengliang Guan 已提交
282 283 284 285 286 287 288
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
    }

    numOfReplicas++;
S
Shengliang Guan 已提交
289 290 291
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
292 293 294 295
  alterReq.replica = numOfReplicas + 1;
  alterReq.replicas[numOfReplicas].id = pDnode->id;
  alterReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
296

S
Shengliang Guan 已提交
297 298 299
  alterEpset.numOfEps = numOfReplicas + 1;
  alterEpset.eps[numOfReplicas].port = pDnode->port;
  memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301 302 303 304
  createReq.replica = 1;
  createReq.replicas[0].id = pDnode->id;
  createReq.replicas[0].port = pDnode->port;
  memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
305

S
Shengliang Guan 已提交
306 307 308
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
309

S
Shengliang Guan 已提交
310
  {
311
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
wafwerar's avatar
wafwerar 已提交
312
    void   *pReq = taosMemoryMalloc(contLen);
313
    tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
S
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315
    STransAction action = {
316
        .epSet = createEpset,
S
Shengliang Guan 已提交
317 318
        .pCont = pReq,
        .contLen = contLen,
319 320
        .msgType = TDMT_DND_CREATE_MNODE,
        .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
S
Shengliang Guan 已提交
321
    };
S
Shengliang Guan 已提交
322 323

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
324
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
325 326 327 328 329
      return -1;
    }
  }

  {
330
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
wafwerar's avatar
wafwerar 已提交
331
    void   *pReq = taosMemoryMalloc(contLen);
332
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
S
Shengliang Guan 已提交
333

S
Shengliang Guan 已提交
334
    STransAction action = {
335
        .epSet = alterEpset,
S
Shengliang Guan 已提交
336 337
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
338
        .msgType = TDMT_MND_ALTER_MNODE,
339
        .acceptableCode = 0,
S
Shengliang Guan 已提交
340 341
    };

S
Shengliang Guan 已提交
342
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
343
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
344 345
      return -1;
    }
346 347 348 349 350
  }

  return 0;
}

S
Shengliang Guan 已提交
351
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
352 353
  int32_t code = -1;

S
Shengliang Guan 已提交
354
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
355
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
356 357 358
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

359
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
360
  if (pTrans == NULL) goto _OVER;
361
  mndTransSetSerial(pTrans);
S
Shengliang Guan 已提交
362 363
  mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);

364 365 366
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
367
  if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
368
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
369

370 371
  code = 0;

372
_OVER:
S
Shengliang Guan 已提交
373
  mndTransDrop(pTrans);
374
  return code;
S
Shengliang Guan 已提交
375 376
}

S
Shengliang Guan 已提交
377 378
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
379 380 381 382 383 384
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SUserObj        *pUser = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
385
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
386
    terrno = TSDB_CODE_INVALID_MSG;
387
    goto _OVER;
S
Shengliang Guan 已提交
388
  }
S
Shengliang Guan 已提交
389

S
Shengliang Guan 已提交
390
  mDebug("mnode:%d, start to create", createReq.dnodeId);
S
Shengliang Guan 已提交
391

S
Shengliang Guan 已提交
392
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
393
  if (pObj != NULL) {
S
Shengliang Guan 已提交
394
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
395
    goto _OVER;
S
Shengliang Guan 已提交
396
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
397
    goto _OVER;
S
Shengliang Guan 已提交
398 399
  }

400 401 402 403 404
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
    terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
    goto _OVER;
  }

S
Shengliang Guan 已提交
405
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
406 407
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
408
    goto _OVER;
S
Shengliang Guan 已提交
409 410
  }

S
Shengliang Guan 已提交
411
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
412 413 414 415
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
416
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
417 418
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
419
    goto _OVER;
S
Shengliang Guan 已提交
420
  }
S
Shengliang Guan 已提交
421

S
Shengliang Guan 已提交
422
  if (mndCheckNodeAuth(pUser) != 0) {
423
    goto _OVER;
S
Shengliang Guan 已提交
424 425 426
  }

  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
427
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
428

429
_OVER:
S
Shengliang Guan 已提交
430
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
431
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
432 433
  }

S
Shengliang Guan 已提交
434 435 436 437 438
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
439 440
}

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
457
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
458 459 460
  SSdb           *pSdb = pMnode->pSdb;
  void           *pIter = NULL;
  int32_t         numOfReplicas = 0;
S
Shengliang Guan 已提交
461
  SDAlterMnodeReq alterReq = {0};
S
Shengliang Guan 已提交
462 463 464 465
  SDDropMnodeReq  dropReq = {0};
  SEpSet          alterEpset = {0};
  SEpSet          dropEpSet = {0};

S
Shengliang Guan 已提交
466 467 468 469
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
470 471 472 473 474 475 476 477
    if (pMObj->id == pObj->id) {
      sdbRelease(pSdb, pMObj);
      continue;
    }

    alterReq.replicas[numOfReplicas].id = pMObj->id;
    alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
478

S
Shengliang Guan 已提交
479 480 481 482
    alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
    if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
      alterEpset.inUse = numOfReplicas;
S
Shengliang Guan 已提交
483
    }
484

S
Shengliang Guan 已提交
485
    numOfReplicas++;
S
Shengliang Guan 已提交
486 487 488
    sdbRelease(pSdb, pMObj);
  }

S
Shengliang Guan 已提交
489
  alterReq.replica = numOfReplicas;
S
Shengliang Guan 已提交
490
  alterEpset.numOfEps = numOfReplicas;
S
Shengliang Guan 已提交
491

S
Shengliang Guan 已提交
492 493 494 495
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
496

S
Shengliang Guan 已提交
497 498 499 500 501 502 503 504 505
  {
    int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
    void   *pReq = taosMemoryMalloc(contLen);
    tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);

    STransAction action = {
        .epSet = alterEpset,
        .pCont = pReq,
        .contLen = contLen,
S
Shengliang Guan 已提交
506
        .msgType = TDMT_MND_ALTER_MNODE,
S
Shengliang Guan 已提交
507 508 509 510 511 512 513
        .acceptableCode = 0,
    };

    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
      taosMemoryFree(pReq);
      return -1;
    }
S
Shengliang Guan 已提交
514 515 516
  }

  {
517
    int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq);
wafwerar's avatar
wafwerar 已提交
518
    void   *pReq = taosMemoryMalloc(contLen);
519
    tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq);
S
Shengliang Guan 已提交
520

S
Shengliang Guan 已提交
521 522 523 524 525 526 527 528
    STransAction action = {
        .epSet = dropEpSet,
        .pCont = pReq,
        .contLen = contLen,
        .msgType = TDMT_DND_DROP_MNODE,
        .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
    };

S
Shengliang Guan 已提交
529
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
530
      taosMemoryFree(pReq);
S
Shengliang Guan 已提交
531 532
      return -1;
    }
533 534 535 536 537
  }

  return 0;
}

S
Shengliang Guan 已提交
538
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
539
  if (pObj == NULL) return 0;
S
Shengliang Guan 已提交
540 541 542 543 544 545 546
  if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
  if (mndTransAppendNullLog(pTrans) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
547
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
548
  int32_t code = -1;
S
Shengliang Guan 已提交
549
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
550

S
Shengliang Guan 已提交
551
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
552
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
553
  mndTransSetSerial(pTrans);
554
  mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
S
Shengliang Guan 已提交
555

S
Shengliang Guan 已提交
556
  if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
557
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
558

559 560
  code = 0;

561
_OVER:
S
Shengliang Guan 已提交
562
  mndTransDrop(pTrans);
563
  return code;
S
Shengliang Guan 已提交
564 565
}

S
Shengliang Guan 已提交
566 567
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
568 569 570 571 572
  int32_t        code = -1;
  SUserObj      *pUser = NULL;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
573
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
574
    terrno = TSDB_CODE_INVALID_MSG;
575
    goto _OVER;
S
Shengliang Guan 已提交
576
  }
S
Shengliang Guan 已提交
577

S
Shengliang Guan 已提交
578
  mDebug("mnode:%d, start to drop", dropReq.dnodeId);
S
Shengliang Guan 已提交
579

S
Shengliang Guan 已提交
580
  if (dropReq.dnodeId <= 0) {
581 582
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
583 584
  }

S
Shengliang Guan 已提交
585
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
586
  if (pObj == NULL) {
587 588 589
    goto _OVER;
  }

590
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
S
Shengliang Guan 已提交
591
    terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
592 593 594 595 596 597
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
598 599
  }

S
Shengliang Guan 已提交
600
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
S
Shengliang Guan 已提交
601 602
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
603
    goto _OVER;
S
Shengliang Guan 已提交
604 605
  }

S
Shengliang Guan 已提交
606
  if (mndCheckNodeAuth(pUser) != 0) {
607
    goto _OVER;
S
Shengliang Guan 已提交
608
  }
S
Shengliang Guan 已提交
609

S
Shengliang Guan 已提交
610
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
611
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
612

613
_OVER:
S
Shengliang Guan 已提交
614
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
615 616 617 618 619 620 621
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
622 623
}

S
Shengliang Guan 已提交
624 625
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
626 627 628
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
629
  SMnodeObj *pObj = NULL;
630
  ESdbStatus objStatus;
S
Shengliang Guan 已提交
631
  char      *pWrite;
632
  int64_t    curMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
633 634

  while (numOfRows < rows) {
635
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus);
S
Shengliang Guan 已提交
636 637 638
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
639 640
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
641

642
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
643
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
644

645 646
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
647

648
    const char *roles = "offline";
649
    if (pObj->id == pMnode->selfDnodeId) {
650 651
      roles = syncStr(TAOS_SYNC_STATE_LEADER);
    }
S
Shengliang Guan 已提交
652
    if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
653 654 655
      roles = syncStr(pObj->state);
    }
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
656
    STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
657
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
658
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
659

660 661 662
    const char *status = "ready";
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
663 664 665 666 667
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)b3, false);

668 669
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
670 671

    numOfRows++;
S
Shengliang Guan 已提交
672
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
673 674
  }

675
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
676 677 678 679 680 681 682 683

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
684 685

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
686
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i;
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
    mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex);
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

S
Shengliang Guan 已提交
713 714
  mTrace("trans:-1, sync reconfig will be proposed");

S
Shengliang Guan 已提交
715 716 717 718
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->standby = 0;
  int32_t code = syncReconfig(pMgmt->sync, &cfg);
  if (code != 0) {
S
Shengliang Guan 已提交
719
    mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
720 721 722
    return code;
  } else {
    pMgmt->errCode = 0;
S
Shengliang Guan 已提交
723
    pMgmt->transId = -1;
S
Shengliang Guan 已提交
724 725 726 727 728 729
    tsem_wait(&pMgmt->syncSem);
    mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
    terrno = pMgmt->errCode;
    return pMgmt->errCode;
  }
}