dnodeMgmt.c 17.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
J
jtao1735 已提交
18
#include "cJSON.h"
S
slguan 已提交
19 20
#include "taoserror.h"
#include "taosmsg.h"
J
jtao1735 已提交
21
#include "ttimer.h"
S
slguan 已提交
22
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
#include "twal.h"
24
#include "tqueue.h"
J
jtao1735 已提交
25 26
#include "tsync.h"
#include "ttimer.h"
S
Shengliang Guan 已提交
27
#include "tbn.h"
S
slguan 已提交
28
#include "tglobal.h"
J
jtao1735 已提交
29 30 31
#include "dnode.h"
#include "vnode.h"
#include "mnode.h"
32
#include "dnodeInt.h"
33
#include "dnodeMgmt.h"
S
TD-1762  
Shengliang Guan 已提交
34 35 36
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
37 38
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
J
jtao1735 已提交
39 40
#include "dnodeModule.h"

41 42 43 44 45 46 47 48 49
typedef struct {
  pthread_t thread;
  int32_t   threadIndex;
  int32_t   failed;
  int32_t   opened;
  int32_t   vnodeNum;
  int32_t * vnodeList;
} SOpenVnodeThread;

S
TD-1915  
Shengliang Guan 已提交
50 51 52 53 54
typedef struct {
  SRpcMsg rpcMsg;
  char    pCont[];
} SMgmtMsg;

S
TD-1762  
Shengliang Guan 已提交
55 56 57 58 59 60 61
void *            tsDnodeTmr = NULL;
static void *     tsStatusTimer = NULL;
static uint32_t   tsRebootTime;
static taos_qset  tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t  tsQthread;

J
jtao1735 已提交
62 63
static void   dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void   dnodeSendStatusMsg(void *handle, void *tmrId);
64
static void  *dnodeProcessMgmtQueue(void *param);
J
jtao1735 已提交
65

S
slguan 已提交
66
static int32_t  dnodeOpenVnodes();
67
static void     dnodeCloseVnodes();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
static int32_t  dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
69
static int32_t  dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71 72
static int32_t  dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t  dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t  dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
S
TD-1762  
Shengliang Guan 已提交
73
static int32_t  dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
S
slguan 已提交
75 76

int32_t dnodeInitMgmt() {
S
slguan 已提交
77
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
S
TD-1671  
Shengliang Guan 已提交
78
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE]  = dnodeProcessAlterVnodeMsg;
S
slguan 已提交
79
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE]   = dnodeProcessDropVnodeMsg;
S
slguan 已提交
80 81
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
S
TD-1671  
Shengliang Guan 已提交
82
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
S
slguan 已提交
83

J
jtao1735 已提交
84 85 86
  dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP,  dnodeProcessStatusRsp);
  tsRebootTime = taosGetTimestampSec();

S
Shengliang Guan 已提交
87 88 89 90 91 92
  int32_t code = vnodeInitResources();
  if (code != TSDB_CODE_SUCCESS) {
    dnodeCleanupMgmt();
    return -1;
  }

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  // create the queue and thread to handle the message 
  tsMgmtQset = taosOpenQset();
  if (tsMgmtQset == NULL) {
    dError("failed to create the mgmt queue set");
    dnodeCleanupMgmt();
    return -1;
  }

  tsMgmtQueue = taosOpenQueue();
  if (tsMgmtQueue == NULL) {
    dError("failed to create the mgmt queue");
    dnodeCleanupMgmt();
    return -1;
  }

  taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);

  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

S
Shengliang Guan 已提交
114
  code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
115 116 117 118 119 120 121 122
  pthread_attr_destroy(&thAttr);
  if (code != 0) {
    dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
    dnodeCleanupMgmt();
    return -1; 
  }

  code = dnodeOpenVnodes();
S
[TD-17]  
slguan 已提交
123
  if (code != TSDB_CODE_SUCCESS) {
124 125 126 127
    dnodeCleanupMgmt();
    return -1;
  }

128 129 130 131 132 133
  dInfo("dnode mgmt is initialized");
 
  return TSDB_CODE_SUCCESS;
}

int32_t dnodeInitMgmtTimer() {
134 135 136 137
  tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
  if (tsDnodeTmr == NULL) {
    dError("failed to init dnode timer");
    dnodeCleanupMgmt();
S
[TD-17]  
slguan 已提交
138 139
    return -1;
  }
S
slguan 已提交
140

J
jtao1735 已提交
141
  taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
142
  dInfo("dnode mgmt timer is initialized");
S
[TD-17]  
slguan 已提交
143
  return TSDB_CODE_SUCCESS;
S
#1177  
slguan 已提交
144 145
}

146 147 148 149 150 151 152
void dnodeSendStatusMsgToMnode() {
  if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
    dInfo("force send status msg to mnode");
    taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
  }
}

153
void dnodeCleanupMgmtTimer() {
J
jtao1735 已提交
154 155 156 157 158 159 160 161 162
  if (tsStatusTimer != NULL) {
    taosTmrStopA(&tsStatusTimer);
    tsStatusTimer = NULL;
  }

  if (tsDnodeTmr != NULL) {
    taosTmrCleanUp(tsDnodeTmr);
    tsDnodeTmr = NULL;
  }
163
}
J
jtao1735 已提交
164

165 166
void dnodeCleanupMgmt() {
  dnodeCleanupMgmtTimer();
167
  dnodeCloseVnodes();
168 169 170 171 172 173 174 175 176

  if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
  if (tsQthread) pthread_join(tsQthread, NULL);

  if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
  if (tsMgmtQset) taosCloseQset(tsMgmtQset);
  tsMgmtQset = NULL;
  tsMgmtQueue = NULL;

177
  vnodeCleanupResources();
S
#1177  
slguan 已提交
178 179
}

S
TD-1915  
Shengliang Guan 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
  int32_t   size = sizeof(SMgmtMsg) + pMsg->contLen;
  SMgmtMsg *pMgmt = taosAllocateQitem(size);
  if (pMgmt == NULL) {
    return TSDB_CODE_DND_OUT_OF_MEMORY;
  }

  pMgmt->rpcMsg = *pMsg;
  pMgmt->rpcMsg.pCont = pMgmt->pCont;
  memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
  taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt);

  return TSDB_CODE_SUCCESS;
}

195
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
S
TD-1915  
Shengliang Guan 已提交
196 197 198
  int32_t code = dnodeWriteToMgmtQueue(pMsg);
  if (code != TSDB_CODE_SUCCESS) {
    SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199 200
    rpcSendResponse(&rsp);
  }
S
TD-1915  
Shengliang Guan 已提交
201 202

  rpcFreeCont(pMsg->pCont);
203 204 205
}

static void *dnodeProcessMgmtQueue(void *param) {
S
TD-1915  
Shengliang Guan 已提交
206 207 208 209 210
  SMgmtMsg *pMgmt;
  SRpcMsg * pMsg;
  SRpcMsg   rsp = {0};
  int32_t   qtype;
  void *    handle;
211 212

  while (1) {
S
TD-1915  
Shengliang Guan 已提交
213
    if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) {
S
TD-1673  
Shengliang Guan 已提交
214
      dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
215 216 217
      break;
    }

S
TD-1915  
Shengliang Guan 已提交
218
    pMsg = &pMgmt->rpcMsg;
S
Shengliang Guan 已提交
219
    dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
220 221 222
    if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
      rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
    } else {
223
      rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
224 225 226
    }

    rsp.handle = pMsg->handle;
S
TD-1915  
Shengliang Guan 已提交
227
    rsp.pCont = NULL;
228 229 230 231
    rpcSendResponse(&rsp);

    taosFreeQitem(pMsg);
  }
S
slguan 已提交
232

233
  return NULL;
S
slguan 已提交
234
}
S
slguan 已提交
235

236
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
S
slguan 已提交
237 238
  DIR *dir = opendir(tsVnodeDir);
  if (dir == NULL) {
239
    return TSDB_CODE_DND_NO_WRITE_ACCESS;
S
slguan 已提交
240 241
  }

242
  *numOfVnodes = 0;
S
slguan 已提交
243 244 245 246 247 248 249 250
  struct dirent *de = NULL;
  while ((de = readdir(dir)) != NULL) {
    if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
    if (de->d_type & DT_DIR) {
      if (strncmp("vnode", de->d_name, 5) != 0) continue;
      int32_t vnode = atoi(de->d_name + 5);
      if (vnode == 0) continue;

251
      (*numOfVnodes)++;
252 253 254 255 256 257 258

      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
        continue;
      } else {
        vnodeList[*numOfVnodes - 1] = vnode;
      }
S
slguan 已提交
259 260 261 262
    }
  }
  closedir(dir);

263
  return TSDB_CODE_SUCCESS;
264 265
}

266 267
static void *dnodeOpenVnode(void *param) {
  SOpenVnodeThread *pThread = param;
S
Shengliang Guan 已提交
268 269
  char vnodeDir[TSDB_FILENAME_LEN * 3];

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    int32_t vgId = pThread->vnodeList[v];
    snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
    if (vnodeOpen(vgId, vnodeDir) < 0) {
      dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
      pThread->opened++;
    }
  }

  dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dnodeOpenVnodes() {
S
Shengliang Guan 已提交
290
  int32_t vnodeList[TSDB_MAX_VNODES] = {0};
S
Shengliang Guan 已提交
291
  int32_t numOfVnodes = 0;
292
  int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
293 294

  if (status != TSDB_CODE_SUCCESS) {
295
    dInfo("get dnode list failed");
296 297
    return status;
  }
298

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
  int32_t threadNum = tsNumOfCores;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
  SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t t = v % threadNum;
    SOpenVnodeThread *pThread = &threads[t];
    pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
  }

  dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

    pthread_attr_destroy(&thAttr);
  }

  int32_t openVnodes = 0;
  int32_t failedVnodes = 0;
  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && pThread->thread) {
      pthread_join(pThread->thread, NULL);
    }
    openVnodes += pThread->opened;
    failedVnodes += pThread->failed;
    free(pThread->vnodeList);
S
slguan 已提交
339
  }
340

341
  free(threads);
342 343
  dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);

S
slguan 已提交
344
  return TSDB_CODE_SUCCESS;
345 346 347
}

static void dnodeCloseVnodes() {
S
Shengliang Guan 已提交
348
  int32_t vnodeList[TSDB_MAX_VNODES]= {0};
S
Shengliang Guan 已提交
349
  int32_t numOfVnodes = 0;
350 351
  int32_t status;

352
  status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
353 354

  if (status != TSDB_CODE_SUCCESS) {
355
    dInfo("get dnode list failed");
356 357
    return;
  }
S
slguan 已提交
358 359 360 361 362

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    vnodeClose(vnodeList[i]);
  }

363
  dInfo("total vnodes:%d are all closed", numOfVnodes);
H
hzcheng 已提交
364 365
}

S
Shengliang Guan 已提交
366
static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
S
TD-1732  
Shengliang Guan 已提交
367
  SCreateVnodeMsg *pCreate = rpcMsg->pCont;
S
slguan 已提交
368
  pCreate->cfg.vgId                = htonl(pCreate->cfg.vgId);
S
slguan 已提交
369
  pCreate->cfg.cfgVersion          = htonl(pCreate->cfg.cfgVersion);
S
slguan 已提交
370
  pCreate->cfg.maxTables           = htonl(pCreate->cfg.maxTables);
S
slguan 已提交
371 372
  pCreate->cfg.cacheBlockSize      = htonl(pCreate->cfg.cacheBlockSize);
  pCreate->cfg.totalBlocks         = htonl(pCreate->cfg.totalBlocks);
S
slguan 已提交
373 374 375 376
  pCreate->cfg.daysPerFile         = htonl(pCreate->cfg.daysPerFile);
  pCreate->cfg.daysToKeep1         = htonl(pCreate->cfg.daysToKeep1);
  pCreate->cfg.daysToKeep2         = htonl(pCreate->cfg.daysToKeep2);
  pCreate->cfg.daysToKeep          = htonl(pCreate->cfg.daysToKeep);
S
slguan 已提交
377 378
  pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
  pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379
  pCreate->cfg.fsyncPeriod         = htonl(pCreate->cfg.fsyncPeriod);
S
slguan 已提交
380 381
  pCreate->cfg.commitTime          = htonl(pCreate->cfg.commitTime);

382
  for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
S
slguan 已提交
383
    pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
384
  }
385

S
Shengliang Guan 已提交
386 387 388 389
  return pCreate;
}

static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
S
TD-1732  
Shengliang Guan 已提交
390
  SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
S
Shengliang Guan 已提交
391

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392
  void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
S
slguan 已提交
393
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
394
    dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
S
slguan 已提交
395
    vnodeRelease(pVnode);
S
Shengliang Guan 已提交
396
    return TSDB_CODE_SUCCESS;
S
slguan 已提交
397
  } else {
S
Shengliang Guan 已提交
398
    dDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId);
S
slguan 已提交
399 400
    return vnodeCreate(pCreate);
  }
S
slguan 已提交
401 402
}

S
Shengliang Guan 已提交
403
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
S
TD-1732  
Shengliang Guan 已提交
404
  SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
S
Shengliang Guan 已提交
405

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406
  void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
S
Shengliang Guan 已提交
407 408 409 410 411 412 413 414 415 416 417
  if (pVnode != NULL) {
    dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
    int32_t code = vnodeAlter(pVnode, pAlter);
    vnodeRelease(pVnode);
    return code;
  } else {
    dError("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId);
    return TSDB_CODE_VND_INVALID_VGROUP_ID;
  }
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
418
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
S
TD-1732  
Shengliang Guan 已提交
419
  SDropVnodeMsg *pDrop = rpcMsg->pCont;
S
slguan 已提交
420
  pDrop->vgId = htonl(pDrop->vgId);
S
slguan 已提交
421

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422
  return vnodeDrop(pDrop->vgId);
H
hzcheng 已提交
423 424
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
425
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
S
TD-1732  
Shengliang Guan 已提交
426
//  SAlterStreamMsg *pStream = pCont;
S
slguan 已提交
427 428 429 430 431 432 433
//  pStream->uid    = htobe64(pStream->uid);
//  pStream->stime  = htobe64(pStream->stime);
//  pStream->vnode  = htonl(pStream->vnode);
//  pStream->sid    = htonl(pStream->sid);
//  pStream->status = htonl(pStream->status);
//
//  int32_t code = dnodeCreateStream(pStream);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434 435

  return 0;
S
slguan 已提交
436 437
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
S
TD-1732  
Shengliang Guan 已提交
439
  SCfgDnodeMsg *pCfg = pMsg->pCont;
S
slguan 已提交
440
  return taosCfgDynamicOptions(pCfg->config);
S
slguan 已提交
441
}
J
jtao1735 已提交
442

S
TD-1671  
Shengliang Guan 已提交
443
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
S
TD-1732  
Shengliang Guan 已提交
444
  SCreateMnodeMsg *pCfg = pMsg->pCont;
S
TD-1671  
Shengliang Guan 已提交
445
  pCfg->dnodeId = htonl(pCfg->dnodeId);
S
TD-1671  
Shengliang Guan 已提交
446
  if (pCfg->dnodeId != dnodeGetDnodeId()) {
S
TD-2266  
Shengliang Guan 已提交
447
    dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
S
TD-1671  
Shengliang Guan 已提交
448 449 450 451
    return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
  }

  if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
S
TD-2266  
Shengliang Guan 已提交
452
    dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
S
TD-1671  
Shengliang Guan 已提交
453 454 455
    return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
  }

S
TD-1762  
Shengliang Guan 已提交
456 457 458 459
  dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
  for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
    pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
    dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
S
TD-1671  
Shengliang Guan 已提交
460 461 462
  }

  dnodeStartMnode(&pCfg->mnodes);
S
TD-1671  
Shengliang Guan 已提交
463 464 465 466

  return TSDB_CODE_SUCCESS;
}

J
jtao1735 已提交
467 468 469 470 471 472 473
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
  if (pMsg->code != TSDB_CODE_SUCCESS) {
    dError("status rsp is received, error:%s", tstrerror(pMsg->code));
    taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
    return;
  }

S
TD-1732  
Shengliang Guan 已提交
474
  SStatusRsp *pStatusRsp = pMsg->pCont;
S
TD-2331  
Shengliang Guan 已提交
475 476
  SMInfos *pMinfos = &pStatusRsp->mnodes;
  dnodeUpdateMInfos(pMinfos);
J
jtao1735 已提交
477

S
TD-1762  
Shengliang Guan 已提交
478 479
  SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
  pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
J
jtao1735 已提交
480
  pCfg->moduleStatus = htonl(pCfg->moduleStatus);
481
  pCfg->dnodeId = htonl(pCfg->dnodeId);
S
TD-1762  
Shengliang Guan 已提交
482
  dnodeUpdateCfg(pCfg);
J
jtao1735 已提交
483

484
  vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
S
TD-1671  
Shengliang Guan 已提交
485

S
TD-1732  
Shengliang Guan 已提交
486
  SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
S
TD-1762  
Shengliang Guan 已提交
487
  dnodeUpdateEps(pEps);
488

J
jtao1735 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}

static void dnodeSendStatusMsg(void *handle, void *tmrId) {
  if (tsDnodeTmr == NULL) {
    dError("dnode timer is already released");
    return;
  }

  if (tsStatusTimer == NULL) {
    taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
    dError("failed to start status timer");
    return;
  }

S
TD-1732  
Shengliang Guan 已提交
504 505
  int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
  SStatusMsg *pStatus = rpcMallocCont(contLen);
J
jtao1735 已提交
506 507 508 509 510 511
  if (pStatus == NULL) {
    taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
    dError("failed to malloc status message");
    return;
  }

S
TD-1762  
Shengliang Guan 已提交
512 513
  dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
  pStatus->dnodeId          = htonl(dnodeGetDnodeId());
J
jtao1735 已提交
514 515 516 517 518
  pStatus->version          = htonl(tsVersion);
  pStatus->lastReboot       = htonl(tsRebootTime);
  pStatus->numOfCores       = htons((uint16_t) tsNumOfCores);
  pStatus->diskAvailable    = tsAvailDataDirGB;
  pStatus->alternativeRole  = (uint8_t) tsAlternativeRole;
519
  tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
H
Hui Li 已提交
520 521

  // fill cluster cfg parameters
S
Shengliang Guan 已提交
522
  pStatus->clusterCfg.numOfMnodes        = htonl(tsNumOfMnodes);
523
  pStatus->clusterCfg.enableBalance      = htonl(tsEnableBalance);
S
Shengliang Guan 已提交
524 525 526 527 528
  pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
  pStatus->clusterCfg.offlineThreshold   = htonl(tsOfflineThreshold);
  pStatus->clusterCfg.statusInterval     = htonl(tsStatusInterval);
  pStatus->clusterCfg.maxtablesPerVnode  = htonl(tsMaxTablePerVnode);
  pStatus->clusterCfg.maxVgroupsPerDb    = htonl(tsMaxVgroupsPerDb);
529 530
  tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
  tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
H
Hui Li 已提交
531 532 533
  pStatus->clusterCfg.checkTime = 0;
  char timestr[32] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
534 535
  tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
  tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);  
J
jtao1735 已提交
536 537
  
  vnodeBuildStatusMsg(pStatus);
S
TD-1732  
Shengliang Guan 已提交
538
  contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
J
jtao1735 已提交
539 540 541 542 543 544 545 546
  pStatus->openVnodes = htons(pStatus->openVnodes);
  
  SRpcMsg rpcMsg = {
    .pCont   = pStatus,
    .contLen = contLen,
    .msgType = TSDB_MSG_TYPE_DM_STATUS
  };

547
  SRpcEpSet epSet;
S
TD-1762  
Shengliang Guan 已提交
548
  dnodeGetEpSetForPeer(&epSet);
549
  dnodeSendMsgToDnode(&epSet, &rpcMsg);
J
jtao1735 已提交
550 551
}

552
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
553
  SRpcConnInfo connInfo = {0};
554
  rpcGetConnInfo(rpcMsg->handle, &connInfo);
S
Shengliang Guan 已提交
555

556
  SRpcEpSet epSet = {0};
557
  if (forShell) {
S
TD-1762  
Shengliang Guan 已提交
558
    dnodeGetEpSetForShell(&epSet);
559
  } else {
S
TD-1762  
Shengliang Guan 已提交
560
    dnodeGetEpSetForPeer(&epSet);
561 562
  }
  
563 564
  dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
         taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
565

566 567 568
  for (int i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
    epSet.port[i] = htons(epSet.port[i]);
S
Shengliang Guan 已提交
569 570
  }

571
  rpcSendRedirectRsp(rpcMsg->handle, &epSet);
572
}