mnodeMnode.c 12.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "os.h"
18
#include "taoserror.h"
S
slguan 已提交
19
#include "trpc.h"
S
slguan 已提交
20
#include "tsync.h"
S
slguan 已提交
21
#include "tbalance.h"
S
slguan 已提交
22 23 24
#include "tutil.h"
#include "ttime.h"
#include "tsocket.h"
25
#include "tdataformat.h"
S
Shengliang Guan 已提交
26 27 28 29 30
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeMnode.h"
#include "mnodeDnode.h"
#include "mnodeSdb.h"
31
#include "mnodeShow.h"
S
Shengliang Guan 已提交
32
#include "mnodeUser.h"
S
slguan 已提交
33

34 35
static void *        tsMnodeSdb = NULL;
static int32_t       tsMnodeUpdateSize = 0;
36 37
static SRpcIpSet     tsMnodeIpSetForShell;
static SRpcIpSet     tsMnodeIpSetForPeer;
38
static SDMMnodeInfos tsMnodeInfos;
39 40
static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
41 42

#if defined(LINUX)
43 44 45 46 47 48
  static pthread_rwlock_t         tsMnodeLock;
  #define mnodeMnodeWrLock()      pthread_rwlock_wrlock(&tsMnodeLock)
  #define mnodeMnodeRdLock()      pthread_rwlock_rdlock(&tsMnodeLock)
  #define mnodeMnodeUnLock()      pthread_rwlock_unlock(&tsMnodeLock)
  #define mnodeMnodeInitLock()    pthread_rwlock_init(&tsMnodeLock, NULL)
  #define mnodeMnodeDestroyLock() pthread_rwlock_destroy(&tsMnodeLock)
49
#else
50 51 52 53 54 55
  static pthread_mutex_t          tsMnodeLock;
  #define mnodeMnodeWrLock()      pthread_mutex_lock(&tsMnodeLock)
  #define mnodeMnodeRdLock()      pthread_mutex_lock(&tsMnodeLock)
  #define mnodeMnodeUnLock()      pthread_mutex_unlock(&tsMnodeLock)
  #define mnodeMnodeInitLock()    pthread_mutex_init(&tsMnodeLock, NULL)
  #define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
56 57
#endif

58
static int32_t mnodeMnodeActionDestroy(SSdbOper *pOper) {
S
slguan 已提交
59 60 61
  tfree(pOper->pObj);
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
62

63
static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
S
slguan 已提交
64
  SMnodeObj *pMnode = pOper->pObj;
65
  SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
S
slguan 已提交
66
  if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
S
slguan 已提交
67 68

  pDnode->isMgmt = true;
69
  mnodeDecDnodeRef(pDnode);
S
slguan 已提交
70
  
71
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
72 73
}

74
static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
S
slguan 已提交
75
  SMnodeObj *pMnode = pOper->pObj;
S
slguan 已提交
76

77
  SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
S
slguan 已提交
78 79
  if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
  pDnode->isMgmt = false;
80
  mnodeDecDnodeRef(pDnode);
S
slguan 已提交
81

S
slguan 已提交
82 83 84 85
  mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
  return TSDB_CODE_SUCCESS;
}

86
static int32_t mnodeMnodeActionUpdate(SSdbOper *pOper) {
S
slguan 已提交
87
  SMnodeObj *pMnode = pOper->pObj;
88
  SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId);
S
slguan 已提交
89 90 91
  if (pMnode != pSaved) {
    memcpy(pSaved, pMnode, pOper->rowSize);
    free(pMnode);
S
slguan 已提交
92
  }
93
  mnodeDecMnodeRef(pSaved);
S
slguan 已提交
94
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
95 96
}

97
static int32_t mnodeMnodeActionEncode(SSdbOper *pOper) {
S
slguan 已提交
98 99 100 101
  SMnodeObj *pMnode = pOper->pObj;
  memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize);
  pOper->rowSize = tsMnodeUpdateSize;
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
102 103
}

104
static int32_t mnodeMnodeActionDecode(SSdbOper *pOper) {
S
slguan 已提交
105 106
  SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
  if (pMnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
S
slguan 已提交
107

S
slguan 已提交
108 109 110
  memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize);
  pOper->pObj = pMnode;
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
111 112
}

113 114
static int32_t mnodeMnodeActionRestored() {
  if (mnodeGetMnodesNum() == 1) {
S
slguan 已提交
115
    SMnodeObj *pMnode = NULL;
116
    void *pIter = mnodeGetNextMnode(NULL, &pMnode);
S
slguan 已提交
117 118
    if (pMnode != NULL) {
      pMnode->role = TAOS_SYNC_ROLE_MASTER;
119
      mnodeDecMnodeRef(pMnode);
S
slguan 已提交
120
    }
S
Shengliang Guan 已提交
121
    sdbFreeIter(pIter);
S
slguan 已提交
122
  }
123

124
  mnodeUpdateMnodeIpSet();
125

S
slguan 已提交
126 127
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
128

129 130
int32_t mnodeInitMnodes() {
  mnodeMnodeInitLock();
131

S
slguan 已提交
132 133 134 135 136 137
  SMnodeObj tObj;
  tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;

  SSdbTableDesc tableDesc = {
    .tableId      = SDB_TABLE_MNODE,
    .tableName    = "mnodes",
S
Shengliang Guan 已提交
138
    .hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE,
S
slguan 已提交
139 140 141
    .maxRowSize   = tsMnodeUpdateSize,
    .refCountPos  = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
    .keyType      = SDB_KEY_INT,
142 143 144 145 146 147 148
    .insertFp     = mnodeMnodeActionInsert,
    .deleteFp     = mnodeMnodeActionDelete,
    .updateFp     = mnodeMnodeActionUpdate,
    .encodeFp     = mnodeMnodeActionEncode,
    .decodeFp     = mnodeMnodeActionDecode,
    .destroyFp    = mnodeMnodeActionDestroy,
    .restoredFp   = mnodeMnodeActionRestored
S
slguan 已提交
149 150 151 152 153 154 155
  };

  tsMnodeSdb = sdbOpenTable(&tableDesc);
  if (tsMnodeSdb == NULL) {
    mError("failed to init mnodes data");
    return -1;
  }
S
slguan 已提交
156

157 158
  mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mnodeGetMnodeMeta);
  mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mnodeRetrieveMnodes);
S
slguan 已提交
159

S
slguan 已提交
160
  mTrace("table:mnodes table is created");
S
slguan 已提交
161 162 163
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
164
void mnodeCleanupMnodes() {
S
slguan 已提交
165
  sdbCloseTable(tsMnodeSdb);
166
  mnodeMnodeDestroyLock();
S
slguan 已提交
167 168
}

169
int32_t mnodeGetMnodesNum() { 
S
slguan 已提交
170 171 172
  return sdbGetNumOfRows(tsMnodeSdb); 
}

173
void *mnodeGetMnode(int32_t mnodeId) {
S
slguan 已提交
174 175 176
  return sdbGetRow(tsMnodeSdb, &mnodeId);
}

177
void mnodeIncMnodeRef(SMnodeObj *pMnode) {
178 179 180
  sdbIncRef(tsMnodeSdb, pMnode);
}

181
void mnodeDecMnodeRef(SMnodeObj *pMnode) {
S
slguan 已提交
182
  sdbDecRef(tsMnodeSdb, pMnode);
S
slguan 已提交
183 184
}

185
void *mnodeGetNextMnode(void *pIter, SMnodeObj **pMnode) { 
S
Shengliang Guan 已提交
186
  return sdbFetchRow(tsMnodeSdb, pIter, (void **)pMnode); 
S
slguan 已提交
187 188
}

189
char *mnodeGetMnodeRoleStr(int32_t role) {
S
slguan 已提交
190 191 192 193 194 195 196 197 198 199 200
  switch (role) {
    case TAOS_SYNC_ROLE_OFFLINE:
      return "offline";
    case TAOS_SYNC_ROLE_UNSYNCED:
      return "unsynced";
    case TAOS_SYNC_ROLE_SLAVE:
      return "slave";
    case TAOS_SYNC_ROLE_MASTER:
      return "master";
    default:
      return "undefined";
S
slguan 已提交
201 202 203
  }
}

204 205
void mnodeUpdateMnodeIpSet() {
  mPrint("update mnodes ipset, numOfIps:%d ", mnodeGetMnodesNum());
206

207
  mnodeMnodeWrLock();
S
slguan 已提交
208

209 210 211
  memset(&tsMnodeIpSetForShell, 0, sizeof(SRpcIpSet));
  memset(&tsMnodeIpSetForPeer, 0, sizeof(SRpcIpSet));
  memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
212

S
slguan 已提交
213
  int32_t index = 0;
214
  void *  pIter = NULL;
S
slguan 已提交
215 216
  while (1) {
    SMnodeObj *pMnode = NULL;
217
    pIter = mnodeGetNextMnode(pIter, &pMnode);
S
slguan 已提交
218 219
    if (pMnode == NULL) break;

220
    SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
221
    if (pDnode != NULL) {
222 223 224
      strcpy(tsMnodeIpSetForShell.fqdn[index], pDnode->dnodeFqdn);
      tsMnodeIpSetForShell.port[index] = htons(pDnode->dnodePort);
      mTrace("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForShell.fqdn[index], htons(tsMnodeIpSetForShell.port[index]));      
225

226 227 228
      strcpy(tsMnodeIpSetForPeer.fqdn[index], pDnode->dnodeFqdn);
      tsMnodeIpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE);
      mTrace("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForPeer.fqdn[index], htons(tsMnodeIpSetForPeer.port[index]));
229

230 231
      tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
      strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp);
232

233
      if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
234 235 236
        tsMnodeIpSetForShell.inUse = index;
        tsMnodeIpSetForPeer.inUse = index;
        tsMnodeInfos.inUse = index;
237
      }
S
slguan 已提交
238

239
      mPrint("mnode:%d, ep:%s %s", pDnode->dnodeId, pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : "");
240 241 242
      index++;
    }

243 244
    mnodeDecDnodeRef(pDnode);
    mnodeDecMnodeRef(pMnode);
S
slguan 已提交
245 246
  }

247 248 249
  tsMnodeInfos.nodeNum = index;
  tsMnodeIpSetForShell.numOfIps = index;
  tsMnodeIpSetForPeer.numOfIps = index;
250

251 252
  sdbFreeIter(pIter);

253 254 255 256 257
  mnodeMnodeUnLock();
}

void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet) {
  mnodeMnodeRdLock();
258
  *ipSet = tsMnodeIpSetForPeer;
259
  mnodeMnodeUnLock();
260 261
}

262 263
void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) {
  mnodeMnodeRdLock();
264
  *ipSet = tsMnodeIpSetForShell;
265
  mnodeMnodeUnLock();
266 267
}

268 269
void mnodeGetMnodeInfos(void *mnodeInfos) {
  mnodeMnodeRdLock();
270
  *(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos;
271
  mnodeMnodeUnLock();
S
slguan 已提交
272 273
}

274
int32_t mnodeAddMnode(int32_t dnodeId) {
S
slguan 已提交
275 276 277 278
  SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
  pMnode->mnodeId = dnodeId;
  pMnode->createdTime = taosGetTimestampMs();

S
slguan 已提交
279
  SSdbOper oper = {
S
slguan 已提交
280 281 282 283 284 285 286 287 288 289 290
    .type = SDB_OPER_GLOBAL,
    .table = tsMnodeSdb,
    .pObj = pMnode,
  };

  int32_t code = sdbInsertRow(&oper);
  if (code != TSDB_CODE_SUCCESS) {
    tfree(pMnode);
    code = TSDB_CODE_SDB_ERROR;
  }

291
  mnodeUpdateMnodeIpSet();
292

S
slguan 已提交
293 294 295
  return code;
}

296 297
void mnodeDropMnodeLocal(int32_t dnodeId) {
  SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
298 299 300
  if (pMnode != NULL) {
    SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
    sdbDeleteRow(&oper);
301
    mnodeDecMnodeRef(pMnode);
302
  }
303

304
  mnodeUpdateMnodeIpSet();
305 306
}

307 308
int32_t mnodeDropMnode(int32_t dnodeId) {
  SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
S
slguan 已提交
309 310 311 312
  if (pMnode == NULL) {
    return TSDB_CODE_DNODE_NOT_EXIST;
  }
  
S
slguan 已提交
313
  SSdbOper oper = {
S
slguan 已提交
314 315 316 317 318 319 320 321 322 323 324
    .type = SDB_OPER_GLOBAL,
    .table = tsMnodeSdb,
    .pObj = pMnode
  };

  int32_t code = sdbDeleteRow(&oper);
  if (code != TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_SDB_ERROR;
  }

  sdbDecRef(tsMnodeSdb, pMnode);
325

326
  mnodeUpdateMnodeIpSet();
327

S
slguan 已提交
328 329 330
  return code;
}

331
static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
S
slguan 已提交
332
  sdbUpdateMnodeRoles();
333
  SUserObj *pUser = mnodeGetUserFromConn(pConn);
S
slguan 已提交
334 335
  if (pUser == NULL) return 0;

S
slguan 已提交
336
  if (strcmp(pUser->pAcct->user, "root") != 0)  {
337
    mnodeDecUserRef(pUser);
S
slguan 已提交
338 339
    return TSDB_CODE_NO_RIGHTS;
  }
S
slguan 已提交
340

341
  int32_t  cols = 0;
H
hjxilinx 已提交
342
  SSchema *pSchema = pMeta->schema;
S
slguan 已提交
343

344 345 346 347 348 349
  pShow->bytes[cols] = 2;
  pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
  strcpy(pSchema[cols].name, "id");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

350
  pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE;
351
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
352
  strcpy(pSchema[cols].name, "end_point");
353 354 355
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

S
slguan 已提交
356
  pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
S
slguan 已提交
357 358 359 360
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "role");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;
S
slguan 已提交
361 362 363
  
  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
364
  strcpy(pSchema[cols].name, "create_time");
S
slguan 已提交
365 366
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;
S
slguan 已提交
367 368 369 370 371 372 373 374 375

  pMeta->numOfColumns = htons(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

376
  pShow->numOfRows = mnodeGetMnodesNum();
S
slguan 已提交
377
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
S
Shengliang Guan 已提交
378
  pShow->pIter = NULL;
379
  mnodeDecUserRef(pUser);
S
slguan 已提交
380 381 382 383

  return 0;
}

384
static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
385 386
  int32_t    numOfRows = 0;
  int32_t    cols      = 0;
S
slguan 已提交
387
  SMnodeObj *pMnode   = NULL;
388
  char      *pWrite;
S
slguan 已提交
389 390

  while (numOfRows < rows) {
391
    pShow->pIter = mnodeGetNextMnode(pShow->pIter, &pMnode);
S
slguan 已提交
392 393 394 395
    if (pMnode == NULL) break;

    cols = 0;

396 397 398 399
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int16_t *)pWrite = pMnode->mnodeId;
    cols++;

S
slguan 已提交
400
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
401
    
402
    SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
403 404 405 406 407
    if (pDnode != NULL) {
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
    } else {
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols] - VARSTR_HEADER_SIZE);
    }
408
    mnodeDecDnodeRef(pDnode);
409

S
slguan 已提交
410 411 412
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
413
    char* roles = mnodeGetMnodeRoleStr(pMnode->role);
414
    STR_TO_VARSTR(pWrite, roles);
S
slguan 已提交
415 416 417
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
slguan 已提交
418
    *(int64_t *)pWrite = pMnode->createdTime;
S
slguan 已提交
419
    cols++;
S
slguan 已提交
420
    
S
slguan 已提交
421
    numOfRows++;
S
slguan 已提交
422

423
    mnodeDecMnodeRef(pMnode);
S
slguan 已提交
424 425 426
  }

  pShow->numOfReads += numOfRows;
S
slguan 已提交
427

S
slguan 已提交
428
  return numOfRows;
S
slguan 已提交
429
}