mndMnode.c 24.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndMnode.h"
#include "mndDnode.h"
H
Hongze Cheng 已提交
19
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
20
#include "mndShow.h"
21
#include "mndSync.h"
S
Shengliang Guan 已提交
22
#include "mndTrans.h"
H
Haojun Liao 已提交
23
#include "tmisce.h"
S
Shengliang Guan 已提交
24

25 26
#define MNODE_VER_NUMBER   1
#define MNODE_RESERVE_SIZE 64
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
S
Shengliang Guan 已提交
29
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
S
Shengliang Guan 已提交
30
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31 32
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
S
Shengliang Guan 已提交
33
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
S
Shengliang Guan 已提交
34
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
35
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
36 37
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
38
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
39
static void     mndReloadSyncConfig(SMnode *pMnode);
S
Shengliang Guan 已提交
40 41

int32_t mndInitMnode(SMnode *pMnode) {
S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50 51
  SSdbTable table = {
      .sdbType = SDB_MNODE,
      .keyType = SDB_KEY_INT32,
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
  };
S
Shengliang Guan 已提交
52

H
Hongze Cheng 已提交
53
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
54
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
55
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
56
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
H
Hongze Cheng 已提交
57
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
58
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
S
Shengliang Guan 已提交
59 60 61

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
S
Shengliang Guan 已提交
62 63 64 65 66 67

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupMnode(SMnode *pMnode) {}

68 69
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
S
Shengliang Guan 已提交
70
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
71 72 73
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
  }
  return pObj;
S
Shengliang Guan 已提交
74 75
}

76
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
77
  SSdb *pSdb = pMnode->pSdb;
78
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
79 80
}

S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
  SMnodeObj mnodeObj = {0};
  mnodeObj.id = 1;
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
  if (pRaw == NULL) return -1;
S
Shengliang Guan 已提交
89
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
90

91
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
92

93
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
94
  if (pTrans == NULL) {
S
Shengliang Guan 已提交
95
    sdbFreeRaw(pRaw);
96 97 98
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
    return -1;
  }
99
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
100 101 102 103 104 105

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
S
Shengliang Guan 已提交
106
  (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
107 108 109 110 111 112 113 114 115

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
116 117
}

S
Shengliang Guan 已提交
118
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
119 120
  terrno = TSDB_CODE_OUT_OF_MEMORY;

121 122
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
123 124

  int32_t dataPos = 0;
125 126 127 128
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
129 130 131

  terrno = 0;

132
_OVER:
133 134 135 136 137
  if (terrno != 0) {
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
138

139
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
140 141 142 143
  return pRaw;
}

static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
144 145
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
146 147 148
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;

149
  if (sver != MNODE_VER_NUMBER) {
S
Shengliang Guan 已提交
150
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
151
    goto _OVER;
S
Shengliang Guan 已提交
152 153
  }

154
  SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
155
  if (pRow == NULL) goto _OVER;
156

S
Shengliang Guan 已提交
157
  SMnodeObj *pObj = sdbGetRowObj(pRow);
158
  if (pObj == NULL) goto _OVER;
S
Shengliang Guan 已提交
159 160

  int32_t dataPos = 0;
161 162 163 164
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
165 166 167

  terrno = 0;

168
_OVER:
169 170
  if (terrno != 0) {
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
171
    taosMemoryFreeClear(pRow);
172 173
    return NULL;
  }
S
Shengliang Guan 已提交
174

175
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
176 177 178
  return pRow;
}

S
Shengliang Guan 已提交
179
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
180
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
181 182
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
S
Shengliang Guan 已提交
183
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
S
Shengliang Guan 已提交
184
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
185 186 187
    return -1;
  }

188
  pObj->syncState = TAOS_SYNC_STATE_ERROR;
189
  mndReloadSyncConfig(pSdb->pMnode);
S
Shengliang Guan 已提交
190 191 192
  return 0;
}

S
Shengliang Guan 已提交
193
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
194
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
195 196 197
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
S
Shengliang Guan 已提交
198 199 200 201 202
  }

  return 0;
}

S
Shengliang Guan 已提交
203
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
S
Shengliang Guan 已提交
204
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
205
  pOld->updateTime = pNew->updateTime;
206 207
  mndReloadSyncConfig(pSdb->pMnode);

S
Shengliang Guan 已提交
208
  return 0;
S
Shengliang Guan 已提交
209 210 211 212 213
}

bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
  SSdb *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
214 215
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
  if (pObj == NULL) {
S
Shengliang Guan 已提交
216 217 218
    return false;
  }

S
Shengliang Guan 已提交
219
  sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
220
  return true;
S
Shengliang Guan 已提交
221 222
}

S
Shengliang Guan 已提交
223
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
224 225
  SSdb   *pSdb = pMnode->pSdb;
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
S
Shengliang Guan 已提交
226 227 228 229
  if (totalMnodes == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
    return;
  }
S
Shengliang Guan 已提交
230

S
Shengliang Guan 已提交
231
  void *pIter = NULL;
S
Shengliang Guan 已提交
232
  while (1) {
S
Shengliang Guan 已提交
233 234
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
S
Shengliang Guan 已提交
235
    if (pIter == NULL) break;
236 237

    if (pObj->id == pMnode->selfDnodeId) {
238
      if (mndIsLeader(pMnode)) {
239
        pEpSet->inUse = pEpSet->numOfEps;
240 241
      } else {
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
242
      }
S
Shengliang Guan 已提交
243
    }
S
Shengliang Guan 已提交
244 245 246
    if (pObj->pDnode != NULL) {
      addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
    }
247
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
248
  }
249 250 251 252

  if (pEpSet->numOfEps == 0) {
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
  }
253 254 255 256

  if (pEpSet->inUse >= pEpSet->numOfEps) {
    pEpSet->inUse = 0;
  }
S
Shengliang Guan 已提交
257 258
}

259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
  void   *pReq = taosMemoryMalloc(contLen);
  tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);

  STransAction action = {
      .epSet = *pCreateEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_DND_CREATE_MNODE,
      .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }
  return 0;
}

static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
  void   *pReq = taosMemoryMalloc(contLen);
  tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);

  STransAction action = {
      .epSet = *pAlterEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_MND_ALTER_MNODE,
      .acceptableCode = 0,
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }

  return 0;
}

static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
S
Shengliang Guan 已提交
325
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
326
  void   *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
327
  tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

  STransAction action = {
      .epSet = *pDroprEpSet,
      .pCont = pReq,
      .contLen = contLen,
      .msgType = TDMT_DND_DROP_MNODE,
      .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
  };

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(pReq);
    return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
344
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
S
Shengliang Guan 已提交
345 346 347
  SSdb            *pSdb = pMnode->pSdb;
  void            *pIter = NULL;
  int32_t          numOfReplicas = 0;
S
Shengliang Guan 已提交
348
  SDCreateMnodeReq createReq = {0};
S
Shengliang Guan 已提交
349 350
  SEpSet           createEpset = {0};

S
Shengliang Guan 已提交
351 352 353 354
  while (1) {
    SMnodeObj *pMObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
    if (pIter == NULL) break;
355

356 357 358
    createReq.replicas[numOfReplicas].id = pMObj->id;
    createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
    memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
359 360

    numOfReplicas++;
S
Shengliang Guan 已提交
361 362 363
    sdbRelease(pSdb, pMObj);
  }

364 365 366 367
  createReq.replica = numOfReplicas + 1;
  createReq.replicas[numOfReplicas].id = pDnode->id;
  createReq.replicas[numOfReplicas].port = pDnode->port;
  memcpy(createReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
368

369
  createEpset.inUse = 0;
S
Shengliang Guan 已提交
370 371 372
  createEpset.numOfEps = 1;
  createEpset.eps[0].port = pDnode->port;
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
373

374
  if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1;
375 376 377 378

  return 0;
}

S
Shengliang Guan 已提交
379
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
380 381
  int32_t code = -1;

S
Shengliang Guan 已提交
382
  SMnodeObj mnodeObj = {0};
S
Shengliang Guan 已提交
383
  mnodeObj.id = pDnode->id;
S
Shengliang Guan 已提交
384 385 386
  mnodeObj.createdTime = taosGetTimestampMs();
  mnodeObj.updateTime = mnodeObj.createdTime;

387
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
388
  if (pTrans == NULL) goto _OVER;
389
  mndTransSetSerial(pTrans);
390
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
S
Shengliang Guan 已提交
391

392
  if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
393 394 395
  if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
396

397 398
  code = 0;

399
_OVER:
S
Shengliang Guan 已提交
400
  mndTransDrop(pTrans);
401
  return code;
S
Shengliang Guan 已提交
402 403
}

S
Shengliang Guan 已提交
404 405
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
  SMnode          *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
406 407 408 409 410
  int32_t          code = -1;
  SMnodeObj       *pObj = NULL;
  SDnodeObj       *pDnode = NULL;
  SMCreateMnodeReq createReq = {0};

S
Shengliang Guan 已提交
411
  if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
S
Shengliang Guan 已提交
412
    terrno = TSDB_CODE_INVALID_MSG;
413
    goto _OVER;
S
Shengliang Guan 已提交
414
  }
S
Shengliang Guan 已提交
415

416
  mInfo("mnode:%d, start to create", createReq.dnodeId);
417
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
S
Shengliang Guan 已提交
418 419
    goto _OVER;
  }
S
Shengliang Guan 已提交
420

S
Shengliang Guan 已提交
421
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
422
  if (pObj != NULL) {
S
Shengliang Guan 已提交
423
    terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
424
    goto _OVER;
S
Shengliang Guan 已提交
425
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
426
    goto _OVER;
S
Shengliang Guan 已提交
427 428
  }

S
Shengliang Guan 已提交
429
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
S
Shengliang Guan 已提交
430 431
  if (pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
432
    goto _OVER;
S
Shengliang Guan 已提交
433 434
  }

435 436 437 438 439
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
    terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
    goto _OVER;
  }

S
Shengliang Guan 已提交
440
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
441 442 443 444
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
445
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
S
Shengliang Guan 已提交
446
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
447

448
_OVER:
S
Shengliang Guan 已提交
449
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
450
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
S
Shengliang Guan 已提交
451 452
  }

S
Shengliang Guan 已提交
453 454 455 456
  mndReleaseMnode(pMnode, pObj);
  mndReleaseDnode(pMnode, pDnode);

  return code;
S
Shengliang Guan 已提交
457 458
}

459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
475 476
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
                                          bool force) {
477 478 479 480
  SSdb          *pSdb = pMnode->pSdb;
  void          *pIter = NULL;
  SDDropMnodeReq dropReq = {0};
  SEpSet         dropEpSet = {0};
S
Shengliang Guan 已提交
481

S
Shengliang Guan 已提交
482 483 484 485
  dropReq.dnodeId = pDnode->id;
  dropEpSet.numOfEps = 1;
  dropEpSet.eps[0].port = pDnode->port;
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
486

487 488
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
  if (totalMnodes == 2) {
S
Shengliang Guan 已提交
489 490 491 492 493
    if (force) {
      mError("cant't force drop dnode, since a mnode on it and replica is 2");
      terrno = TSDB_CODE_NODE_OFFLINE;
      return -1;
    }
494 495
    mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
    if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
S
Shengliang Guan 已提交
496 497 498
    if (!force) {
      if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
    }
499 500
  } else if (totalMnodes == 3) {
    mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
S
Shengliang Guan 已提交
501 502 503
    if (!force) {
      if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
    }
504 505 506
    if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
  } else {
    return -1;
507 508 509 510 511
  }

  return 0;
}

S
Shengliang Guan 已提交
512
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
513
  if (pObj == NULL) return 0;
S
Shengliang Guan 已提交
514
  if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force) != 0) return -1;
515
  if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
S
Shengliang Guan 已提交
516 517 518
  return 0;
}

S
Shengliang Guan 已提交
519
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
520
  int32_t code = -1;
S
Shengliang Guan 已提交
521
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
522

523
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
524
  if (pTrans == NULL) goto _OVER;
S
Shengliang Guan 已提交
525
  mndTransSetSerial(pTrans);
526
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
S
Shengliang Guan 已提交
527

S
Shengliang Guan 已提交
528
  if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
529
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
S
Shengliang Guan 已提交
530

531 532
  code = 0;

533
_OVER:
S
Shengliang Guan 已提交
534
  mndTransDrop(pTrans);
535
  return code;
S
Shengliang Guan 已提交
536 537
}

S
Shengliang Guan 已提交
538 539
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
  SMnode        *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
540 541 542 543
  int32_t        code = -1;
  SMnodeObj     *pObj = NULL;
  SMDropMnodeReq dropReq = {0};

S
Shengliang Guan 已提交
544
  if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
Shengliang Guan 已提交
545
    terrno = TSDB_CODE_INVALID_MSG;
546
    goto _OVER;
S
Shengliang Guan 已提交
547
  }
S
Shengliang Guan 已提交
548

549
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
550
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
S
Shengliang Guan 已提交
551 552
    goto _OVER;
  }
S
Shengliang Guan 已提交
553

S
Shengliang Guan 已提交
554
  if (dropReq.dnodeId <= 0) {
555 556
    terrno = TSDB_CODE_INVALID_MSG;
    goto _OVER;
S
Shengliang Guan 已提交
557 558
  }

S
Shengliang Guan 已提交
559
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
S
Shengliang Guan 已提交
560
  if (pObj == NULL) {
561 562 563
    goto _OVER;
  }

564
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
S
Shengliang Guan 已提交
565
    terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
566 567 568 569 570 571
    goto _OVER;
  }

  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
    terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
    goto _OVER;
S
Shengliang Guan 已提交
572 573
  }

574 575 576 577 578
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
    terrno = TSDB_CODE_NODE_OFFLINE;
    goto _OVER;
  }

S
Shengliang Guan 已提交
579
  code = mndDropMnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
580
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
581

582
_OVER:
S
Shengliang Guan 已提交
583
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
584 585 586 587 588
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
  }

  mndReleaseMnode(pMnode, pObj);
  return code;
S
Shengliang Guan 已提交
589 590
}

S
Shengliang Guan 已提交
591 592
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
593 594 595
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
S
Shengliang Guan 已提交
596
  SMnodeObj *pObj = NULL;
597
  ESdbStatus objStatus = 0;
S
Shengliang Guan 已提交
598
  char      *pWrite;
599
  int64_t    curMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
600 601

  while (numOfRows < rows) {
602
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
S
Shengliang Guan 已提交
603 604 605
    if (pShow->pIter == NULL) break;

    cols = 0;
S
Shengliang Guan 已提交
606 607
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->id, false);
S
Shengliang Guan 已提交
608

609
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
S
Shengliang Guan 已提交
610
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
S
Shengliang Guan 已提交
611

612 613
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, b1, false);
S
Shengliang Guan 已提交
614

615
    char role[20] = "offline";
616
    if (pObj->id == pMnode->selfDnodeId) {
617
      snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
618
    }
S
Shengliang Guan 已提交
619
    if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
620 621 622
      tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
      if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
        tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
S
Shengliang Guan 已提交
623 624
        mError("mnode:%d, is leader too", pObj->id);
      }
625 626
    }
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
627
    STR_WITH_MAXSIZE_TO_VARSTR(b2, role, pShow->pMeta->pSchemas[cols].bytes);
628
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
S
Shengliang Guan 已提交
629
    colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
S
Shengliang Guan 已提交
630

631 632 633
    const char *status = "ready";
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
634 635 636 637 638
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)b3, false);

639 640
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
S
Shengliang Guan 已提交
641 642

    numOfRows++;
S
Shengliang Guan 已提交
643
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
644 645
  }

646
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
647 648 649 650 651 652 653 654

  return numOfRows;
}

static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
S
Shengliang Guan 已提交
655 656

static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
657 658 659
#if 1
  return 0;
#else
S
Shengliang Guan 已提交
660
  SMnode         *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
661 662 663 664 665 666 667
  SDAlterMnodeReq alterReq = {0};

  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
  SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
  memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
  for (int32_t i = 0; i < option.numOfReplicas; ++i) {
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
      option.selfIndex = i;
    }
  }

  if (option.selfIndex == -1) {
    mInfo("alter mnode not processed since selfIndex is -1", terrstr());
    return 0;
  }

  if (mndWriteFile(pMnode->path, &option) != 0) {
    mError("failed to write mnode file since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
686 687 688 689 690
  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
  for (int32_t i = 0; i < alterReq.replica; ++i) {
    SNodeInfo *pNode = &cfg.nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = alterReq.replicas[i].port;
691 692 693
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
      cfg.myIndex = i;
    }
S
Shengliang Guan 已提交
694 695 696 697 698 699
  }

  if (cfg.myIndex == -1) {
    mError("failed to alter mnode since myindex is -1");
    return -1;
  } else {
700
    mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
S
Shengliang Guan 已提交
701 702 703 704 705 706
    for (int32_t i = 0; i < alterReq.replica; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }
  }

707
  int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
S
Shengliang Guan 已提交
708
  if (code != 0) {
709
    mError("failed to sync reconfig since %s", terrstr());
S
Shengliang Guan 已提交
710
  } else {
711 712 713 714 715 716 717 718 719 720 721 722
    mInfo("alter mnode sync success");
  }

  return code;
#endif
}

static void mndReloadSyncConfig(SMnode *pMnode) {
  SSdb      *pSdb = pMnode->pSdb;
  SMnodeObj *pObj = NULL;
  ESdbStatus objStatus = 0;
  void      *pIter = NULL;
723 724
  int32_t    updatingMnodes = 0;
  int32_t    readyMnodes = 0;
725 726 727 728 729 730 731
  SSyncCfg   cfg = {.myIndex = -1};

  while (1) {
    pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
    if (pIter == NULL) break;
    if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
      mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
732 733 734 735 736
      updatingMnodes++;
    }
    if (objStatus == SDB_STATUS_READY) {
      mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
      readyMnodes++;
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
    }

    if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
      SNodeInfo *pNode = &cfg.nodeInfo[cfg.replicaNum];
      tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, sizeof(pNode->nodeFqdn));
      pNode->nodePort = pObj->pDnode->port;
      if (pObj->pDnode->id == pMnode->selfDnodeId) {
        cfg.myIndex = cfg.replicaNum;
      }
      cfg.replicaNum++;
    }

    sdbReleaseLock(pSdb, pObj, false);
  }

752 753
  if (readyMnodes <= 0 || updatingMnodes <= 0) {
    mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
754 755
    return;
  }
S
Shengliang Guan 已提交
756
  // ASSERT(0);
757

758
  if (cfg.myIndex == -1) {
759
#if 1
760
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
761 762 763 764 765
#else
    // cannot reconfig because the leader may fail to elect after reboot
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
    syncStop(pMnode->syncMgmt.sync);
#endif
766 767 768
    return;
  }

769 770
  if (updatingMnodes > 0) {
    mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
771 772 773 774 775 776 777 778 779 780 781
    for (int32_t i = 0; i < cfg.replicaNum; ++i) {
      SNodeInfo *pNode = &cfg.nodeInfo[i];
      mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
    }

    int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
    if (code != 0) {
      mError("vgId:1, failed to reconfig mnode sync since %s", terrstr());
    } else {
      mInfo("vgId:1, reconfig mnode sync success");
    }
S
Shengliang Guan 已提交
782 783
  }
}