mmInt.c 7.3 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mmInt.h"
S
xshm  
Shengliang Guan 已提交
18
#include "wal.h"
S
shm  
Shengliang Guan 已提交
19

S
shm  
Shengliang Guan 已提交
20 21 22 23 24
static bool mmDeployRequired(SDnode *pDnode) {
  if (pDnode->dnodeId > 0) return false;
  if (pDnode->clusterId > 0) return false;
  if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) return false;
  return true;
S
shm  
Shengliang Guan 已提交
25 26
}

S
shm  
Shengliang Guan 已提交
27 28 29 30
static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
  SMnodeMgmt mgmt = {0};
  mgmt.path = pWrapper->path;
  if (mmReadFile(&mgmt, required) != 0) {
S
shm  
Shengliang Guan 已提交
31 32 33
    return -1;
  }

S
shm  
Shengliang Guan 已提交
34 35
  if (!(*required)) {
    *required = mmDeployRequired(pWrapper->pDnode);
S
shm  
Shengliang Guan 已提交
36 37 38 39 40 41 42
  }

  return 0;
}

static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
  SDnode *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
43 44
  pOption->dnodeId = pDnode->dnodeId;
  pOption->clusterId = pDnode->clusterId;
S
Shengliang Guan 已提交
45 46 47

  SMsgCb msgCb = {0};
  msgCb.pWrapper = pMgmt->pWrapper;
D
dapan1121 已提交
48 49
  msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
  msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
S
Shengliang Guan 已提交
50
  msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
S
Shengliang Guan 已提交
51
  msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue;
S
Shengliang Guan 已提交
52 53 54
  msgCb.sendReqFp = dndSendReqToDnode;
  msgCb.sendMnodeReqFp = dndSendReqToMnode;
  msgCb.sendRspFp = dndSendRsp;
S
Shengliang Guan 已提交
55
  msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
S
Shengliang Guan 已提交
56
  pOption->msgCb = msgCb;
S
shm  
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65 66
}

static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
  SDnode *pDnode = pMgmt->pDnode;

  mmInitOption(pMgmt, pOption);
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
S
shm  
Shengliang Guan 已提交
67
  pReplica->port = pDnode->serverPort;
S
shm  
Shengliang Guan 已提交
68
  tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN);
S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81

  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}

static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
  mmInitOption(pMgmt, pOption);
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}

S
shm  
Shengliang Guan 已提交
82
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
S
shm  
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  mmInitOption(pMgmt, pOption);

  pOption->replica = pCreate->replica;
  pOption->selfIndex = -1;
  for (int32_t i = 0; i < pCreate->replica; ++i) {
    SReplica *pReplica = &pOption->replicas[i];
    pReplica->id = pCreate->replicas[i].id;
    pReplica->port = pCreate->replicas[i].port;
    memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
    if (pReplica->id == pOption->dnodeId) {
      pOption->selfIndex = i;
    }
  }

  if (pOption->selfIndex == -1) {
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  return 0;
}

S
shm  
Shengliang Guan 已提交
108 109 110 111 112 113 114 115 116 117 118 119
static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
  SMnodeOpt option = {0};
  if (pReq != NULL) {
    if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
      return -1;
    }
  } else {
    bool deployed = false;
    if (mmReadFile(pMgmt, &deployed) != 0) {
      dError("failed to read file since %s", terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
120

S
shm  
Shengliang Guan 已提交
121 122 123 124 125 126 127
    if (!deployed) {
      dInfo("mnode start to deploy");
      mmBuildOptionForDeploy(pMgmt, &option);
    } else {
      dInfo("mnode start to open");
      mmBuildOptionForOpen(pMgmt, &option);
    }
S
shm  
Shengliang Guan 已提交
128 129
  }

S
shm  
Shengliang Guan 已提交
130 131 132 133
  pMgmt->pMnode = mndOpen(pMgmt->path, &option);
  if (pMgmt->pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
134 135
  }

S
shm  
Shengliang Guan 已提交
136 137 138
  if (mmStartWorker(pMgmt) != 0) {
    dError("failed to start mnode worker since %s", terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
139 140
  }

S
shm  
Shengliang Guan 已提交
141
  bool deployed = true;
S
shm  
Shengliang Guan 已提交
142
  if (mmWriteFile(pMgmt, deployed) != 0) {
S
shm  
Shengliang Guan 已提交
143 144
    dError("failed to write mnode file since %s", terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
145 146
  }

S
shm  
Shengliang Guan 已提交
147
  return 0;
S
shm  
Shengliang Guan 已提交
148 149
}

S
shm  
Shengliang Guan 已提交
150 151 152 153 154
static void mmCloseImp(SMnodeMgmt *pMgmt) {
  if (pMgmt->pMnode != NULL) {
    mmStopWorker(pMgmt);
    mndClose(pMgmt->pMnode);
    pMgmt->pMnode = NULL;
S
shm  
Shengliang Guan 已提交
155
  }
S
shm  
Shengliang Guan 已提交
156
}
S
shm  
Shengliang Guan 已提交
157

S
shm  
Shengliang Guan 已提交
158 159 160 161 162 163
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
  SMnodeOpt option = {0};
  if (pReq != NULL) {
    if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
      return -1;
    }
S
shm  
Shengliang Guan 已提交
164 165
  }

S
shm  
Shengliang Guan 已提交
166 167 168 169 170 171 172 173 174 175 176 177
  return mndAlter(pMgmt->pMnode, &option);
}

int32_t mmDrop(SMgmtWrapper *pWrapper) {
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
  if (pMgmt == NULL) return 0;

  dInfo("mnode-mgmt start to drop");
  bool deployed = false;
  if (mmWriteFile(pMgmt, deployed) != 0) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
178 179
  }

S
shm  
Shengliang Guan 已提交
180 181 182
  mmCloseImp(pMgmt);
  taosRemoveDir(pMgmt->path);
  pWrapper->pMgmt = NULL;
wafwerar's avatar
wafwerar 已提交
183
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
184 185
  dInfo("mnode-mgmt is dropped");
  return 0;
S
shm  
Shengliang Guan 已提交
186 187
}

S
shm  
Shengliang Guan 已提交
188 189 190 191 192 193 194
static void mmClose(SMgmtWrapper *pWrapper) {
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
  if (pMgmt == NULL) return;

  dInfo("mnode-mgmt start to cleanup");
  mmCloseImp(pMgmt);
  pWrapper->pMgmt = NULL;
wafwerar's avatar
wafwerar 已提交
195
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
196 197 198 199 200 201 202
  dInfo("mnode-mgmt is cleaned up");
}

int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
  dInfo("mnode-mgmt start to init");
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
203
    return -1;
S
shm  
Shengliang Guan 已提交
204 205
  }

wafwerar's avatar
wafwerar 已提交
206
  SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
S
shm  
Shengliang Guan 已提交
207 208
  if (pMgmt == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
209
    return -1;
S
shm  
Shengliang Guan 已提交
210 211
  }

S
shm  
Shengliang Guan 已提交
212 213 214 215
  pMgmt->path = pWrapper->path;
  pMgmt->pDnode = pWrapper->pDnode;
  pMgmt->pWrapper = pWrapper;
  pWrapper->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
216

S
shm  
Shengliang Guan 已提交
217 218 219 220 221 222
  int32_t code = mmOpenImp(pMgmt, pReq);
  if (code != 0) {
    dError("failed to init mnode-mgmt since %s", terrstr());
    mmClose(pWrapper);
  } else {
    dInfo("mnode-mgmt is initialized");
S
shm  
Shengliang Guan 已提交
223 224
  }

S
shm  
Shengliang Guan 已提交
225 226 227 228 229
  return code;
}

static int32_t mmOpen(SMgmtWrapper *pWrapper) {
  return mmOpenFromMsg(pWrapper, NULL);
S
shm  
Shengliang Guan 已提交
230
}
S
shm  
Shengliang Guan 已提交
231

S
shm  
Shengliang Guan 已提交
232
static int32_t mmStart(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
233
  dDebug("mnode-mgmt start to run");
S
shm  
Shengliang Guan 已提交
234 235 236 237
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
  return mndStart(pMgmt->pMnode);
}

S
shm  
Shengliang Guan 已提交
238
void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
239
  SMgmtFp mgmtFp = {0};
S
shm  
Shengliang Guan 已提交
240 241
  mgmtFp.openFp = mmOpen;
  mgmtFp.closeFp = mmClose;
S
shm  
Shengliang Guan 已提交
242
  mgmtFp.startFp = mmStart;
S
shm  
Shengliang Guan 已提交
243 244
  mgmtFp.createMsgFp = mmProcessCreateReq;
  mgmtFp.dropMsgFp = mmProcessDropReq;
S
shm  
Shengliang Guan 已提交
245
  mgmtFp.requiredFp = mmRequire;
S
shm  
Shengliang Guan 已提交
246 247 248 249

  mmInitMsgHandles(pWrapper);
  pWrapper->name = "mnode";
  pWrapper->fp = mgmtFp;
S
shm  
Shengliang Guan 已提交
250 251 252
}

int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
shm  
Shengliang Guan 已提交
253 254
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;

S
shm  
Shengliang Guan 已提交
255
  int32_t code = mndRetriveAuth(pMgmt->pMnode, user, spi, encrypt, secret, ckey);
S
shm  
Shengliang Guan 已提交
256 257
  dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
  return code;
S
shm  
Shengliang Guan 已提交
258
}
S
shm  
Shengliang Guan 已提交
259

S
shm  
Shengliang Guan 已提交
260
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
shm  
Shengliang Guan 已提交
261
                         SMonGrantInfo *pGrantInfo) {
S
shm  
Shengliang Guan 已提交
262
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
S
shm  
Shengliang Guan 已提交
263
  return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
D
dapan1121 已提交
264
}