mndMnode.c 25.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndMnode.h"
#include "mndDnode.h"
H
Hongze Cheng 已提交
19
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
20
#include "mndShow.h"
21
#include "mndSync.h"
S
Shengliang Guan 已提交
22
#include "mndTrans.h"
H
Haojun Liao 已提交
23
#include "tmisce.h"
24
#include "mndCluster.h"
S
Shengliang Guan 已提交
25

26 27
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
30
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
31
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
32 33
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
34
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
37 38
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
39
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
40
static void     mndReloadSyncConfig(SMnode *pMnode);
S
Shengliang Guan 已提交
41 42

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
53

H
Hongze Cheng 已提交
54
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
55
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
56
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
57
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
H
Hongze Cheng 已提交
58
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
59
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
60 61 62

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
63 64 65 66 67 68

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

69 70
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
71
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
72 73 74
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
75 76
}

77
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
78
  SSdb *pSdb = pMnode->pSdb;
79
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
80 81
}

S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
S
Shengliang Guan 已提交
90
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
91

92
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
93

94
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
95
  if (pTrans == NULL) {
S
Shengliang Guan 已提交
96
    sdbFreeRaw(pRaw);
97 98 99
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
100
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
101 102 103 104 105 106

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
S
Shengliang Guan 已提交
107
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
108 109 110 111 112 113 114 115 116

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
117 118
}

S
Shengliang Guan 已提交
119
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
120 121
  terrno = TSDB_CODE_OUT_OF_MEMORY;

122 123
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
124 125

  int32_t dataPos = 0;
126 127 128 129
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
130 131 132

  terrno = 0;

133
_OVER:
134 135 136 137 138
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
139

140
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
141 142 143 144
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
145
  terrno = TSDB_CODE_OUT_OF_MEMORY;
146 147
  SSdbRow   *pRow = NULL;
  SMnodeObj *pObj = NULL;
148

S
Shengliang Guan 已提交
149 150 151
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

152
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
153
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
154
    goto _OVER;
S
Shengliang Guan 已提交
155 156
  }

157
  pRow = sdbAllocRow(sizeof(SMnodeObj));
158
  if (pRow == NULL) goto _OVER;
159

160
  pObj = sdbGetRowObj(pRow);
161
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
162 163

  int32_t dataPos = 0;
164 165 166 167
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
168 169 170

  terrno = 0;

171
_OVER:
172
  if (terrno != 0) {
173
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
174
    taosMemoryFreeClear(pRow);
175 176
    return NULL;
  }
S
Shengliang Guan 已提交
177

178
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
179 180 181
  return pRow;
}

S
Shengliang Guan 已提交
182
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
183
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
184 185
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
186
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
187
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
188 189 190
    return -1;
  }

S
Shengliang Guan 已提交
191
  pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
192
  mndReloadSyncConfig(pSdb->pMnode);
S
Shengliang Guan 已提交
193 194 195
  return 0;
}

S
Shengliang Guan 已提交
196
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
197
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
198 199 200
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
201 202 203 204 205
  }

  return 0;
}

S
Shengliang Guan 已提交
206
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
207
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
208
  pOld->updateTime = pNew->updateTime;
209 210
  mndReloadSyncConfig(pSdb->pMnode);

S
Shengliang Guan 已提交
211
  return 0;
S
Shengliang Guan 已提交
212 213 214 215 216
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
217 218
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
219 220 221
    return false;
  }

S
Shengliang Guan 已提交
222
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
223
  return true;
S
Shengliang Guan 已提交
224 225
}

S
Shengliang Guan 已提交
226
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
227 228
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
S
Shengliang Guan 已提交
229 230 231 232
  if (totalMnodes == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
    return;
  }
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234
  void *pIter = NULL;
S
Shengliang Guan 已提交
235
  while (1) {
S
Shengliang Guan 已提交
236 237
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
238
    if (pIter == NULL) break;
239 240

    if (pObj->id == pMnode->selfDnodeId) {
241
      if (mndIsLeader(pMnode)) {
242
        pEpSet->inUse = pEpSet->numOfEps;
243 244
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
245
      }
S
Shengliang Guan 已提交
246
    }
S
Shengliang Guan 已提交
247 248 249
    if (pObj->pDnode != NULL) {
      addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    }
250
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
251
  }
252 253 254 255

  if (pEpSet->numOfEps == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
  }
256 257 258 259

  if (pEpSet->inUse >= pEpSet->numOfEps) {
    pEpSet->inUse = 0;
  }
S
Shengliang Guan 已提交
260 261
}

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

286 287 288 289 290 291 292 293 294 295
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
  void   *pReq = taosMemoryMalloc(contLen);
  tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);

  STransAction action = {
      .epSet = *pCreateEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_DND_CREATE_MNODE,
296
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }
  return 0;
}

static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
  void   *pReq = taosMemoryMalloc(contLen);
  tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);

  STransAction action = {
      .epSet = *pAlterEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_MND_ALTER_MNODE,
      .acceptableCode = 0,
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }

  return 0;
}

static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
S
Shengliang Guan 已提交
328
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
329
  void   *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
330
  tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
331 332 333 334 335 336

  STransAction action = {
      .epSet = *pDroprEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_DND_DROP_MNODE,
337
      .acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
338 339 340 341 342 343 344 345 346
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
347
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
348 349 350
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
S
Shengliang Guan 已提交
351
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
352 353
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
354 355 356 357
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
358

359 360 361
    createReq.replicas[numOfReplicas].id = pMObj->id;
    createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
362 363

    numOfReplicas++;
S
Shengliang Guan 已提交
364 365 366
    sdbRelease(pSdb, pMObj);
  }

367 368 369 370
  createReq.replica = numOfReplicas + 1;
  createReq.replicas[numOfReplicas].id = pDnode->id;
  createReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(createReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
371

372
  createEpset.inUse = 0;
S
Shengliang Guan 已提交
373 374 375
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
376

377
  if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1;
378 379 380 381

  return 0;
}

S
Shengliang Guan 已提交
382
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
383 384
  int32_t code = -1;

S
Shengliang Guan 已提交
385
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
386
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
387 388 389
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

390
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
391
  if (pTrans == NULL) goto _OVER;
392
  mndTransSetSerial(pTrans);
393
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
394
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
395

396
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
397 398 399
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
400

401 402
  code = 0;

403
_OVER:
S
Shengliang Guan 已提交
404
  mndTransDrop(pTrans);
405
  return code;
S
Shengliang Guan 已提交
406 407
}

S
Shengliang Guan 已提交
408 409
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
410 411 412 413 414
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
415
  if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
416
    terrno = TSDB_CODE_INVALID_MSG;
417
    goto _OVER;
S
Shengliang Guan 已提交
418
  }
S
Shengliang Guan 已提交
419

420
  mInfo("mnode:%d, start to create", createReq.dnodeId);
421
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
S
Shengliang Guan 已提交
422 423
    goto _OVER;
  }
S
Shengliang Guan 已提交
424

S
Shengliang Guan 已提交
425
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
426
  if (pObj != NULL) {
S
Shengliang Guan 已提交
427
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
428
    goto _OVER;
S
Shengliang Guan 已提交
429
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
430
    goto _OVER;
S
Shengliang Guan 已提交
431 432
  }

S
Shengliang Guan 已提交
433
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
434 435
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
436
    goto _OVER;
S
Shengliang Guan 已提交
437 438
  }

439 440 441 442 443
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
    terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
    goto _OVER;
  }

S
Shengliang Guan 已提交
444
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
445
    terrno = TSDB_CODE_DNODE_OFFLINE;
446 447 448
    goto _OVER;
  }

S
Shengliang Guan 已提交
449
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
450
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
451

452
_OVER:
S
Shengliang Guan 已提交
453
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
454
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
455 456
  }

S
Shengliang Guan 已提交
457 458 459 460
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);

  return code;
S
Shengliang Guan 已提交
461 462
}

463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
479 480
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
                                          bool force) {
481 482 483 484
  SSdb          *pSdb = pMnode->pSdb;
  void          *pIter = NULL;
  SDDropMnodeReq dropReq = {0};
  SEpSet         dropEpSet = {0};
S
Shengliang Guan 已提交
485

S
Shengliang Guan 已提交
486 487 488 489
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
490

491 492
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
  if (totalMnodes == 2) {
S
Shengliang Guan 已提交
493 494
    if (force) {
      mError("cant't force drop dnode, since a mnode on it and replica is 2");
495
      terrno = TSDB_CODE_DNODE_OFFLINE;
S
Shengliang Guan 已提交
496 497
      return -1;
    }
498 499
    mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
    if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
S
Shengliang Guan 已提交
500 501 502
    if (!force) {
      if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
    }
503 504
  } else if (totalMnodes == 3) {
    mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
S
Shengliang Guan 已提交
505 506 507
    if (!force) {
      if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
    }
508 509 510
    if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
  } else {
    return -1;
511 512 513 514 515
  }

  return 0;
}

S
Shengliang Guan 已提交
516
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
517
  if (pObj == NULL) return 0;
S
Shengliang Guan 已提交
518
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force) != 0) return -1;
519
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
S
Shengliang Guan 已提交
520 521 522
  return 0;
}

S
Shengliang Guan 已提交
523
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
524
  int32_t code = -1;
S
Shengliang Guan 已提交
525
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
526

527
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
528
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
529
  mndTransSetSerial(pTrans);
530
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
531
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
532

S
Shengliang Guan 已提交
533
  if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
534
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
535

536 537
  code = 0;

538
_OVER:
S
Shengliang Guan 已提交
539
  mndTransDrop(pTrans);
540
  return code;
S
Shengliang Guan 已提交
541 542
}

S
Shengliang Guan 已提交
543 544
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
545 546 547 548
  int32_t        code = -1;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
549
  if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
550
    terrno = TSDB_CODE_INVALID_MSG;
551
    goto _OVER;
S
Shengliang Guan 已提交
552
  }
S
Shengliang Guan 已提交
553

554
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
555
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
S
Shengliang Guan 已提交
556 557
    goto _OVER;
  }
S
Shengliang Guan 已提交
558

S
Shengliang Guan 已提交
559
  if (dropReq.dnodeId <= 0) {
560 561
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
562 563
  }

S
Shengliang Guan 已提交
564
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
565
  if (pObj == NULL) {
566 567 568
    goto _OVER;
  }

569
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
S
Shengliang Guan 已提交
570
    terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
571 572 573 574 575 576
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
577 578
  }

579
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
580
    terrno = TSDB_CODE_DNODE_OFFLINE;
581 582 583
    goto _OVER;
  }

S
Shengliang Guan 已提交
584
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
585
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
586

587
_OVER:
S
Shengliang Guan 已提交
588
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
589 590 591 592 593
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  return code;
S
Shengliang Guan 已提交
594 595
}

S
Shengliang Guan 已提交
596 597
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
598 599 600
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
601
  SMnodeObj *pObj = NULL;
602
  ESdbStatus objStatus = 0;
S
Shengliang Guan 已提交
603
  char      *pWrite;
604
  int64_t    curMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
605 606

  while (numOfRows < rows) {
607
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
S
Shengliang Guan 已提交
608 609 610
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
611 612
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
613

614
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
S
Shengliang Guan 已提交
615
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
S
Shengliang Guan 已提交
616

617 618
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
619

620
    char role[20] = "offline";
621
    if (pObj->id == pMnode->selfDnodeId) {
622
      snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
623
    }
S
Shengliang Guan 已提交
624
    if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
625 626 627
      tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
      if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
        tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
S
Shengliang Guan 已提交
628 629
        mError("mnode:%d, is leader too", pObj->id);
      }
630 631
    }
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
632
    STR_WITH_MAXSIZE_TO_VARSTR(b2, role, pShow->pMeta->pSchemas[cols].bytes);
633
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
634
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
635

636 637 638
    const char *status = "ready";
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
639
    if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline";
640 641 642 643 644
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)b3, false);

645 646
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
647 648

    numOfRows++;
S
Shengliang Guan 已提交
649
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
650 651
  }

652
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
653 654 655 656 657 658 659 660

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
661 662

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
663 664 665
#if 1
  return 0;
#else
S
Shengliang Guan 已提交
666
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
667 668 669 670 671 672 673
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
  SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
  memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
  for (int32_t i = 0; i < option.numOfReplicas; ++i) {
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
      option.selfIndex = i;
    }
  }

  if (option.selfIndex == -1) {
    mInfo("alter mnode not processed since selfIndex is -1", terrstr());
    return 0;
  }

  if (mndWriteFile(pMnode->path, &option) != 0) {
    mError("failed to write mnode file since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
692 693 694 695 696
  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
697 698 699
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
      cfg.myIndex = i;
    }
S
Shengliang Guan 已提交
700 701 702 703 704 705
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
706
    mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
S
Shengliang Guan 已提交
707 708 709 710 711 712
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

713
  int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
S
Shengliang Guan 已提交
714
  if (code != 0) {
715
    mError("failed to sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
716
  } else {
717 718 719 720 721 722 723 724 725 726 727 728
    mInfo("alter mnode sync success");
  }

  return code;
#endif
}

static void mndReloadSyncConfig(SMnode *pMnode) {
  SSdb      *pSdb = pMnode->pSdb;
  SMnodeObj *pObj = NULL;
  ESdbStatus objStatus = 0;
  void      *pIter = NULL;
729 730
  int32_t    updatingMnodes = 0;
  int32_t    readyMnodes = 0;
731 732 733 734 735 736 737
  SSyncCfg   cfg = {.myIndex = -1};

  while (1) {
    pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
    if (pIter == NULL) break;
    if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
      mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
738 739 740 741 742
      updatingMnodes++;
    }
    if (objStatus == SDB_STATUS_READY) {
      mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
      readyMnodes++;
743 744 745 746
    }

    if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
      SNodeInfo *pNode = &cfg.nodeInfo[cfg.replicaNum];
747 748
      pNode->nodeId = pObj->pDnode->id;
      pNode->clusterId = mndGetClusterId(pMnode);
749
      pNode->nodePort = pObj->pDnode->port;
750 751 752
      tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
      (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
      mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
753 754 755 756 757 758 759 760 761
      if (pObj->pDnode->id == pMnode->selfDnodeId) {
        cfg.myIndex = cfg.replicaNum;
      }
      cfg.replicaNum++;
    }

    sdbReleaseLock(pSdb, pObj, false);
  }

762 763
  if (readyMnodes <= 0 || updatingMnodes <= 0) {
    mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
764 765 766
    return;
  }

767
  if (cfg.myIndex == -1) {
768
#if 1
769
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
770 771 772 773 774
#else
    // cannot reconfig because the leader may fail to elect after reboot
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
    syncStop(pMnode->syncMgmt.sync);
#endif
775 776 777
    return;
  }

778 779
  if (updatingMnodes > 0) {
    mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
780 781
    for (int32_t i = 0; i < cfg.replicaNum; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
782 783
      mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
            pNode->clusterId);
784 785 786 787
    }

    int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
    if (code != 0) {
S
Shengliang Guan 已提交
788
      mError("vgId:1, mnode sync reconfig failed since %s", terrstr());
789
    } else {
S
Shengliang Guan 已提交
790
      mInfo("vgId:1, mnode sync reconfig success");
791
    }
S
Shengliang Guan 已提交
792 793
  }
}