dmNodes.c 8.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
20
  int32_t shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
21
  if (pWrapper->ntype == VNODE) {
22
    shmsize = tsVnodeShmSize;
S
Shengliang Guan 已提交
23
  } else if (pWrapper->ntype == QNODE) {
24
    shmsize = tsQnodeShmSize;
S
Shengliang Guan 已提交
25
  } else if (pWrapper->ntype == SNODE) {
26
    shmsize = tsSnodeShmSize;
S
Shengliang Guan 已提交
27
  } else if (pWrapper->ntype == MNODE) {
28
    shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
29
  } else if (pWrapper->ntype == BNODE) {
30 31 32 33 34
    shmsize = tsBnodeShmSize;
  } else {
    return -1;
  }

S
Shengliang Guan 已提交
35
  if (taosCreateShm(&pWrapper->proc.shm, pWrapper->ntype, shmsize) != 0) {
36 37 38 39
    terrno = TAOS_SYSTEM_ERROR(terrno);
    dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
40
  dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
41

S
Shengliang Guan 已提交
42
  if (dmInitProc(pWrapper) != 0) {
43 44 45 46 47 48 49
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
50
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
51 52
  char  tstr[8] = {0};
  char *args[6] = {0};
S
Shengliang Guan 已提交
53
  snprintf(tstr, sizeof(tstr), "%d", ntype);
54 55 56 57 58 59 60 61 62 63 64 65 66
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
67
  pWrapper->proc.pid = pid;
68 69 70 71
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
  return 0;
}

S
Shengliang Guan 已提交
72
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
73
  if (pWrapper->pDnode->rtype == NODE_END) {
S
Shengliang Guan 已提交
74
    dInfo("node:%s, should be started manually in child process", pWrapper->name);
75
  } else {
S
Shengliang Guan 已提交
76
    if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
77 78 79
      return -1;
    }
  }
S
Shengliang Guan 已提交
80
  if (dmRunProc(&pWrapper->proc) != 0) {
S
Shengliang Guan 已提交
81 82 83
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }
84 85 86
  return 0;
}

S
Shengliang Guan 已提交
87
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
88 89 90 91 92 93
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
94
  SMgmtOutputOpt output = {0};
S
Shengliang Guan 已提交
95
  SMgmtInputOpt  input = dmBuildMgmtInputOpt(pWrapper);
S
Shengliang Guan 已提交
96 97

  if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
98
    tmsgSetDefaultMsgCb(&input.msgCb);
S
Shengliang Guan 已提交
99 100
  }

S
Shengliang Guan 已提交
101
  if (OnlyInSingleProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
102
    if ((*pWrapper->func.openFp)(&input, &output) != 0) {
S
Shengliang Guan 已提交
103 104 105 106 107
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
      return -1;
    }
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
S
Shengliang Guan 已提交
108 109 110
  }

  if (InChildProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
111
    if ((*pWrapper->func.openFp)(&input, &output) != 0) {
S
Shengliang Guan 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
      return -1;
    }
    if (dmInitProc(pWrapper) != 0) {
      return -1;
    }
    if (dmRunProc(&pWrapper->proc) != 0) {
      return -1;
    }
    dDebug("node:%s, has been opened in child process", pWrapper->name);
    pWrapper->deployed = true;
  }

  if (InParentProc(pWrapper->proc.ptype)) {
    if (dmInitParentProc(pWrapper) != 0) {
      return -1;
    }
    if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) {
      return -1;
    }
    if (dmRunParentProc(pWrapper) != 0) {
      return -1;
    }
    dDebug("node:%s, has been opened in parent process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
136 137
  }

S
Shengliang Guan 已提交
138 139 140 141
  if (output.pMgmt != NULL) {
    pWrapper->pMgmt = output.pMgmt;
  }

S
Shengliang Guan 已提交
142
  dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned");
S
shm  
Shengliang Guan 已提交
143
  return 0;
S
shm  
Shengliang Guan 已提交
144
}
S
shm  
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
147 148 149 150
  if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
  if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
151
  }
S
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153
  dmReportStartup(pWrapper->pDnode, pWrapper->name, "started");
154 155 156
  return 0;
}

S
Shengliang Guan 已提交
157
void dmStopNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
158
  if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) {
S
Shengliang Guan 已提交
159
    (*pWrapper->func.stopFp)(pWrapper->pMgmt);
S
Shengliang Guan 已提交
160
    dDebug("node:%s, has been stopped", pWrapper->name);
S
Shengliang Guan 已提交
161
  }
S
Shengliang Guan 已提交
162 163 164 165
}

void dmCloseNode(SMgmtWrapper *pWrapper) {
  dInfo("node:%s, start to close", pWrapper->name);
S
Shengliang Guan 已提交
166
  pWrapper->deployed = false;
167 168 169 170 171

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
Shengliang Guan 已提交
172 173 174 175 176 177 178 179
  if (OnlyInParentProc(pWrapper->proc.ptype)) {
    int32_t pid = pWrapper->proc.pid;
    if (pid > 0 && taosProcExist(pid)) {
      dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid);
      taosKillProc(pid);
      dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pid);
      taosWaitProc(pid);
      dInfo("node:%s, child process:%d is stopped", pWrapper->name, pid);
S
Shengliang Guan 已提交
180 181 182
    }
  }

S
shm  
Shengliang Guan 已提交
183
  taosWLockLatch(&pWrapper->latch);
S
Shengliang Guan 已提交
184 185 186 187
  if (pWrapper->pMgmt != NULL) {
    (*pWrapper->func.closeFp)(pWrapper->pMgmt);
    pWrapper->pMgmt = NULL;
  }
S
Shengliang Guan 已提交
188 189
  taosWUnLockLatch(&pWrapper->latch);

S
Shengliang Guan 已提交
190 191
  if (!OnlyInSingleProc(pWrapper->proc.ptype)) {
    dmCleanupProc(pWrapper);
S
shm  
Shengliang Guan 已提交
192
  }
S
shm  
Shengliang Guan 已提交
193

S
Shengliang Guan 已提交
194
  dInfo("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
195 196
}

S
Shengliang Guan 已提交
197
static int32_t dmOpenNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
198 199 200
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
201
    if (dmOpenNode(pWrapper) != 0) {
S
Shengliang Guan 已提交
202
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
Shengliang Guan 已提交
203
      return -1;
S
Shengliang Guan 已提交
204
    }
S
shm  
Shengliang Guan 已提交
205 206
  }

S
Shengliang Guan 已提交
207
  dmSetStatus(pDnode, DND_STAT_RUNNING);
S
Shengliang Guan 已提交
208 209
  return 0;
}
S
shm  
Shengliang Guan 已提交
210

S
Shengliang Guan 已提交
211
static int32_t dmStartNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
212 213
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
214
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
215
    if (dmStartNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
216 217 218 219 220 221
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
S
Shengliang Guan 已提交
222
  dmReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
223 224 225
  return 0;
}

S
Shengliang Guan 已提交
226 227
static void dmStopNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
228
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
229
    dmStopNode(pWrapper);
S
shm  
Shengliang Guan 已提交
230
  }
S
Shengliang Guan 已提交
231
}
S
shm  
Shengliang Guan 已提交
232

S
Shengliang Guan 已提交
233 234
static void dmCloseNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
235
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
236
    dmCloseNode(pWrapper);
S
shm  
Shengliang Guan 已提交
237
  }
S
Shengliang Guan 已提交
238
}
S
shm  
Shengliang Guan 已提交
239

S
Shengliang Guan 已提交
240
static void dmWatchNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
241 242
  if (!InParentProc(pDnode->ptype)) return;
  if (pDnode->rtype == NODE_END) return;
243

244
  taosThreadMutexLock(&pDnode->mutex);
S
Shengliang Guan 已提交
245 246
  for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
247 248
    SProc        *proc = &pWrapper->proc;

249
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
250 251 252 253 254 255
    if (!InParentProc(proc->ptype)) continue;

    if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
      dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
      dmCloseProcRpcHandles(&pWrapper->proc);
      dmNewNodeProc(pWrapper, ntype);
S
shm  
Shengliang Guan 已提交
256 257
    }
  }
258
  taosThreadMutexUnlock(&pDnode->mutex);
S
shm  
Shengliang Guan 已提交
259
}
S
shm  
Shengliang Guan 已提交
260

S
Shengliang Guan 已提交
261 262 263
int32_t dmRun(SDnode *pDnode) {
  if (dmOpenNodes(pDnode) != 0) {
    dError("failed to open nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
264 265
    return -1;
  }
S
shm  
Shengliang Guan 已提交
266

S
Shengliang Guan 已提交
267 268
  if (dmStartNodes(pDnode) != 0) {
    dError("failed to start nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
269
    return -1;
S
shm  
Shengliang Guan 已提交
270 271
  }

S
shm  
Shengliang Guan 已提交
272
  while (1) {
S
Shengliang Guan 已提交
273 274
    if (pDnode->event & DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
Shengliang Guan 已提交
275
      dmSetStatus(pDnode, DND_STAT_STOPPED);
S
Shengliang Guan 已提交
276 277 278
      dmStopNodes(pDnode);
      dmCloseNodes(pDnode);
      return 0;
S
shm  
Shengliang Guan 已提交
279
    }
S
Shengliang Guan 已提交
280 281 282

    dmWatchNodes(pDnode);
    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
283
  }
S
shm  
Shengliang Guan 已提交
284
}