mmMgmt.c 8.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
#include "dndMgmt.h"
S
Shengliang Guan 已提交
20 21
#include "dndTransport.h"

S
shm  
Shengliang Guan 已提交
22
#if 0
S
shm  
Shengliang Guan 已提交
23 24 25 26 27
static void    mmInitOption(SDnode *pDnode, SMnodeOpt *pOption);
static void    mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption);
static void    mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption);
static bool    mmDeployRequired(SDnode *pDnode);
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption);
S
shm  
Shengliang Guan 已提交
28

S
shm  
Shengliang Guan 已提交
29 30 31 32
int32_t mmInit(SDnode *pDnode) {
  dInfo("mnode mgmt start to init");
  int32_t code = -1;

S
shm  
Shengliang Guan 已提交
33
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  taosInitRWLatch(&pMgmt->latch);
  mmInitMsgFp(pMgmt);

  if (mmReadFile(pDnode) != 0) {
    goto _OVER;
  }

  if (pMgmt->dropped) {
    dInfo("mnode has been dropped and needs to be deleted");
    mndDestroy(pDnode->dir.mnode);
    code = 0;
    goto _OVER;
  }

  if (!pMgmt->deployed) {
    bool required = mmDeployRequired(pDnode);
    if (!required) {
      dInfo("mnode does not need to be deployed");
      code = 0;
      goto _OVER;
    }

    dInfo("mnode start to deploy");
    SMnodeOpt option = {0};
    mmBuildOptionForDeploy(pDnode, &option);
    code = mmOpen(pDnode, &option);
  } else {
    dInfo("mnode start to open");
    SMnodeOpt option = {0};
    mmBuildOptionForOpen(pDnode, &option);
    code = mmOpen(pDnode, &option);
  }

_OVER:
  if (code == 0) {
    dInfo("mnode mgmt init success");
  } else {
    dError("failed to init mnode mgmt since %s", terrstr());
    mmCleanup(pDnode);
  }

  return code;
}
S
shm  
Shengliang Guan 已提交
77

S
shm  
Shengliang Guan 已提交
78 79
void mmCleanup(SDnode *pDnode) {
  dInfo("mnode mgmt start to clean up");
S
shm  
Shengliang Guan 已提交
80
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
81 82 83 84 85 86 87
  if (pMgmt->pMnode) {
    mmStopWorker(pDnode);
    mndClose(pMgmt->pMnode);
    pMgmt->pMnode = NULL;
  }
  dInfo("mnode mgmt is cleaned up");
}
S
Shengliang Guan 已提交
88

S
shm  
Shengliang Guan 已提交
89
SMnode *mmAcquire(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
90
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
91 92 93 94 95 96 97 98 99 100 101 102
  SMnode     *pMnode = NULL;
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMgmt->deployed && !pMgmt->dropped) {
    refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
    pMnode = pMgmt->pMnode;
  } else {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
  }
  taosRUnLockLatch(&pMgmt->latch);

103 104 105
  if (pMnode != NULL) {
    dTrace("acquire mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
106 107 108
  return pMnode;
}

S
shm  
Shengliang Guan 已提交
109
void mmRelease(SDnode *pDnode, SMnode *pMnode) {
S
Shengliang Guan 已提交
110
  if (pMnode == NULL) return;
S
Shengliang Guan 已提交
111

S
shm  
Shengliang Guan 已提交
112
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
113
  taosRLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
114
  int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
S
Shengliang Guan 已提交
115
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
116
  dTrace("release mnode, refCount:%d", refCount);
S
Shengliang Guan 已提交
117 118
}

S
shm  
Shengliang Guan 已提交
119
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
S
shm  
Shengliang Guan 已提交
120
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
121
  pMgmt->singleProc = true;
S
shm  
Shengliang Guan 已提交
122 123 124 125

  int32_t code = mmOpenImp(pDnode, pOption);

  if (code == 0 && !pMgmt->singleProc) {
S
shm  
Shengliang Guan 已提交
126 127 128 129 130 131 132 133
    SProcCfg cfg = {.childQueueSize = 1024 * 1024,
                    .childConsumeFp = (ProcConsumeFp)mmConsumeChildQueue,
                    .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                    .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                    .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                    .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                    .parentQueueSize = 1024 * 1024,
                    .parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue,
S
shm  
Shengliang Guan 已提交
134 135
                    .parentdMallocHeadFp = (ProcMallocFp)malloc,
                    .parentFreeHeadFp = (ProcFreeFp)free,
S
shm  
Shengliang Guan 已提交
136 137
                    .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                    .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
S
shm  
Shengliang Guan 已提交
138 139 140
                    .testFlag = 0,
                    .pParent = pDnode,
                    .name = "mnode"};
S
shm  
Shengliang Guan 已提交
141 142 143 144 145 146

    pMgmt->pProcess = taosProcInit(&cfg);
    if (pMgmt->pProcess == NULL) {
      return -1;
    }

S
shm  
Shengliang Guan 已提交
147
    return taosProcRun(pMgmt->pProcess);
S
Shengliang Guan 已提交
148 149
  }

S
shm  
Shengliang Guan 已提交
150 151 152 153
  return code;
}

int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
S
shm  
Shengliang Guan 已提交
154
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
155 156 157 158

  SMnode *pMnode = mmAcquire(pDnode);
  if (pMnode == NULL) {
    dError("failed to alter mnode since %s", terrstr());
S
Shengliang Guan 已提交
159 160 161
    return -1;
  }

S
shm  
Shengliang Guan 已提交
162 163 164
  if (mndAlter(pMnode, pOption) != 0) {
    dError("failed to alter mnode since %s", terrstr());
    mmRelease(pDnode, pMnode);
S
Shengliang Guan 已提交
165 166 167
    return -1;
  }

S
shm  
Shengliang Guan 已提交
168
  mmRelease(pDnode, pMnode);
S
Shengliang Guan 已提交
169 170 171
  return 0;
}

S
shm  
Shengliang Guan 已提交
172
int32_t mmDrop(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
173
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
174

S
shm  
Shengliang Guan 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188
  SMnode *pMnode = mmAcquire(pDnode);
  if (pMnode == NULL) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

  taosRLockLatch(&pMgmt->latch);
  pMgmt->dropped = 1;
  taosRUnLockLatch(&pMgmt->latch);

  if (mmWriteFile(pDnode) != 0) {
    taosRLockLatch(&pMgmt->latch);
    pMgmt->dropped = 0;
    taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
189

S
shm  
Shengliang Guan 已提交
190 191 192
    mmRelease(pDnode, pMnode);
    dError("failed to drop mnode since %s", terrstr());
    return -1;
S
Shengliang Guan 已提交
193
  }
194

S
shm  
Shengliang Guan 已提交
195 196 197 198 199 200 201 202 203
  mmRelease(pDnode, pMnode);
  mmStopWorker(pDnode);
  pMgmt->deployed = 0;
  mmWriteFile(pDnode);
  mndClose(pMnode);
  pMgmt->pMnode = NULL;
  mndDestroy(pDnode->dir.mnode);

  return 0;
S
Shengliang Guan 已提交
204 205
}

S
shm  
Shengliang Guan 已提交
206
static bool mmDeployRequired(SDnode *pDnode) {
S
Shengliang Guan 已提交
207 208 209 210 211 212 213
  if (dndGetDnodeId(pDnode) > 0) {
    return false;
  }

  if (dndGetClusterId(pDnode) > 0) {
    return false;
  }
S
Shengliang Guan 已提交
214

S
Shengliang Guan 已提交
215
  if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
S
Shengliang Guan 已提交
216 217 218 219 220 221
    return false;
  }

  return true;
}

S
shm  
Shengliang Guan 已提交
222
static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
223
  pOption->pDnode = pDnode;
S
Shengliang Guan 已提交
224 225 226
  pOption->sendReqToDnodeFp = dndSendReqToDnode;
  pOption->sendReqToMnodeFp = dndSendReqToMnode;
  pOption->sendRedirectRspFp = dndSendRedirectRsp;
S
shm  
Shengliang Guan 已提交
227 228
  pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue;
  pOption->putReqToMReadQFp = mmPutMsgToReadQueue;
S
Shengliang Guan 已提交
229 230 231 232
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);
}

S
shm  
Shengliang Guan 已提交
233 234
static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
  mmInitOption(pDnode, pOption);
S
Shengliang Guan 已提交
235 236 237 238
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
S
Shengliang Guan 已提交
239 240
  pReplica->port = pDnode->cfg.serverPort;
  memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
241

S
shm  
Shengliang Guan 已提交
242
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
243 244 245
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
246 247
}

S
shm  
Shengliang Guan 已提交
248 249
static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) {
  mmInitOption(pDnode, pOption);
S
shm  
Shengliang Guan 已提交
250
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
251 252 253
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
254 255
}

S
shm  
Shengliang Guan 已提交
256 257
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
  mmInitOption(pDnode, pOption);
S
Shengliang Guan 已提交
258 259 260
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);

S
Shengliang Guan 已提交
261
  pOption->replica = pCreate->replica;
S
Shengliang Guan 已提交
262
  pOption->selfIndex = -1;
S
Shengliang Guan 已提交
263
  for (int32_t i = 0; i < pCreate->replica; ++i) {
264
    SReplica *pReplica = &pOption->replicas[i];
S
Shengliang Guan 已提交
265 266 267
    pReplica->id = pCreate->replicas[i].id;
    pReplica->port = pCreate->replicas[i].port;
    memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
268
    if (pReplica->id == pOption->dnodeId) {
269
      pOption->selfIndex = i;
S
Shengliang Guan 已提交
270 271 272
    }
  }

S
Shengliang Guan 已提交
273
  if (pOption->selfIndex == -1) {
S
Shengliang Guan 已提交
274 275 276 277
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
278
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
279 280 281
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
282 283 284
  return 0;
}

S
shm  
Shengliang Guan 已提交
285
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
S
shm  
Shengliang Guan 已提交
286
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
287

288
  SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
S
Shengliang Guan 已提交
289 290
  if (pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
S
Shengliang Guan 已提交
291 292 293
    return -1;
  }

S
shm  
Shengliang Guan 已提交
294
  if (mmStartWorker(pDnode) != 0) {
S
Shengliang Guan 已提交
295
    dError("failed to start mnode worker since %s", terrstr());
S
Shengliang Guan 已提交
296 297 298
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
    return -1;
S
Shengliang Guan 已提交
299 300
  }

S
Shengliang Guan 已提交
301
  pMgmt->deployed = 1;
S
shm  
Shengliang Guan 已提交
302
  if (mmWriteFile(pDnode) != 0) {
S
Shengliang Guan 已提交
303
    dError("failed to write mnode file since %s", terrstr());
S
Shengliang Guan 已提交
304
    pMgmt->deployed = 0;
S
shm  
Shengliang Guan 已提交
305
    mmStopWorker(pDnode);
306 307
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
308
    return -1;
S
Shengliang Guan 已提交
309 310 311 312 313 314 315
  }

  taosWLockLatch(&pMgmt->latch);
  pMgmt->pMnode = pMnode;
  pMgmt->deployed = 1;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
316
  dInfo("mnode open successfully");
S
Shengliang Guan 已提交
317 318
  return 0;
}
S
shm  
Shengliang Guan 已提交
319 320

#endif