dmNodes.c 8.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18

19
static int32_t dmCreateShm(SMgmtWrapper *pWrapper) {
20
  int32_t shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
21
  if (pWrapper->ntype == VNODE) {
22
    shmsize = tsVnodeShmSize;
S
Shengliang Guan 已提交
23
  } else if (pWrapper->ntype == QNODE) {
24
    shmsize = tsQnodeShmSize;
S
Shengliang Guan 已提交
25
  } else if (pWrapper->ntype == SNODE) {
26
    shmsize = tsSnodeShmSize;
S
Shengliang Guan 已提交
27
  } else if (pWrapper->ntype == MNODE) {
28 29 30 31 32
    shmsize = tsMnodeShmSize;
  } else {
    return -1;
  }

S
Shengliang Guan 已提交
33
  if (taosCreateShm(&pWrapper->proc.shm, pWrapper->ntype, shmsize) != 0) {
34 35 36 37
    terrno = TAOS_SYSTEM_ERROR(terrno);
    dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
38

S
Shengliang Guan 已提交
39
  dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
40 41 42
  return 0;
}

43
static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
44 45
  char  tstr[8] = {0};
  char *args[6] = {0};
S
Shengliang Guan 已提交
46
  snprintf(tstr, sizeof(tstr), "%d", ntype);
47 48 49 50 51 52 53 54 55 56 57 58 59
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

60
  taosIgnSignal(SIGCHLD);
S
Shengliang Guan 已提交
61
  pWrapper->proc.pid = pid;
S
Shengliang Guan 已提交
62
  dInfo("node:%s, continue running in new pid:%d", pWrapper->name, pid);
63 64 65
  return 0;
}

S
Shengliang Guan 已提交
66
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
67
  SDnode *pDnode = pWrapper->pDnode;
68

S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
75
  SMgmtOutputOpt output = {0};
S
Shengliang Guan 已提交
76
  SMgmtInputOpt  input = dmBuildMgmtInputOpt(pWrapper);
S
Shengliang Guan 已提交
77

78
  if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
S
Shengliang Guan 已提交
79
    tmsgSetDefault(&input.msgCb);
S
Shengliang Guan 已提交
80 81
  }

82
  if (OnlyInSingleProc(pWrapper)) {
S
Shengliang Guan 已提交
83
    dInfo("node:%s, start to open", pWrapper->name);
S
Shengliang Guan 已提交
84
    if ((*pWrapper->func.openFp)(&input, &output) != 0) {
S
Shengliang Guan 已提交
85 86 87
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
88
    dInfo("node:%s, has been opened", pWrapper->name);
S
Shengliang Guan 已提交
89
    pWrapper->deployed = true;
S
Shengliang Guan 已提交
90 91
  }

92
  if (InParentProc(pWrapper)) {
S
Shengliang Guan 已提交
93
    dDebug("node:%s, start to open", pWrapper->name);
94
    if (dmCreateShm(pWrapper) != 0) {
S
Shengliang Guan 已提交
95 96
      return -1;
    }
97
    if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) {
S
Shengliang Guan 已提交
98 99
      return -1;
    }
100

101
    if (OnlyInParentProc(pWrapper)) {
102 103 104 105
      if (dmInitProc(pWrapper) != 0) {
        dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr());
        return -1;
      }
106
      if (pDnode->rtype == NODE_END) {
107 108 109 110 111 112 113 114 115 116
        dInfo("node:%s, should be started manually in child process", pWrapper->name);
      } else {
        if (dmNewProc(pWrapper, pWrapper->ntype) != 0) {
          return -1;
        }
      }
      if (dmRunProc(&pWrapper->proc) != 0) {
        dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
        return -1;
      }
S
Shengliang Guan 已提交
117
    }
118
    dDebug("node:%s, has been opened in parent process", pWrapper->name);
S
Shengliang Guan 已提交
119 120
  }

121
  if (InChildProc(pWrapper)) {
S
Shengliang Guan 已提交
122
    dDebug("node:%s, start to open", pWrapper->name);
123 124
    if ((*pWrapper->func.openFp)(&input, &output) != 0) {
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
Shengliang Guan 已提交
125 126
      return -1;
    }
127
    if (dmInitProc(pWrapper) != 0) {
S
Shengliang Guan 已提交
128 129
      return -1;
    }
130
    if (dmRunProc(&pWrapper->proc) != 0) {
S
Shengliang Guan 已提交
131 132
      return -1;
    }
133 134
    dDebug("node:%s, has been opened in child process", pWrapper->name);
    pWrapper->deployed = true;
S
shm  
Shengliang Guan 已提交
135 136
  }

S
Shengliang Guan 已提交
137 138 139 140
  if (output.pMgmt != NULL) {
    pWrapper->pMgmt = output.pMgmt;
  }

141
  dmReportStartup(pWrapper->name, "openned");
S
shm  
Shengliang Guan 已提交
142
  return 0;
S
shm  
Shengliang Guan 已提交
143
}
S
shm  
Shengliang Guan 已提交
144

S
Shengliang Guan 已提交
145
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
146
  if (OnlyInParentProc(pWrapper)) return 0;
S
Shengliang Guan 已提交
147 148 149 150 151 152 153
  if (pWrapper->func.startFp != NULL) {
    dDebug("node:%s, start to start", pWrapper->name);
    if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
    dDebug("node:%s, has been started", pWrapper->name);
154
  }
S
Shengliang Guan 已提交
155

156
  dmReportStartup(pWrapper->name, "started");
157 158 159
  return 0;
}

S
Shengliang Guan 已提交
160
void dmStopNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
161
  if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) {
S
Shengliang Guan 已提交
162
    dDebug("node:%s, start to stop", pWrapper->name);
S
Shengliang Guan 已提交
163
    (*pWrapper->func.stopFp)(pWrapper->pMgmt);
S
Shengliang Guan 已提交
164
    dDebug("node:%s, has been stopped", pWrapper->name);
S
Shengliang Guan 已提交
165
  }
S
Shengliang Guan 已提交
166 167 168 169
}

void dmCloseNode(SMgmtWrapper *pWrapper) {
  dInfo("node:%s, start to close", pWrapper->name);
S
Shengliang Guan 已提交
170
  pWrapper->deployed = false;
171 172 173 174 175

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

176
  if (OnlyInParentProc(pWrapper)) {
S
Shengliang Guan 已提交
177 178
    int32_t pid = pWrapper->proc.pid;
    if (pid > 0 && taosProcExist(pid)) {
S
Shengliang Guan 已提交
179
      dInfo("node:%s, send kill signal to the child pid:%d", pWrapper->name, pid);
S
Shengliang Guan 已提交
180
      taosKillProc(pid);
S
Shengliang Guan 已提交
181
      dInfo("node:%s, wait for child pid:%d to stop", pWrapper->name, pid);
S
Shengliang Guan 已提交
182
      taosWaitProc(pid);
S
Shengliang Guan 已提交
183
      dInfo("node:%s, child pid:%d is stopped", pWrapper->name, pid);
S
Shengliang Guan 已提交
184 185 186
    }
  }

187
  taosThreadRwlockWrlock(&pWrapper->lock);
S
Shengliang Guan 已提交
188 189 190 191
  if (pWrapper->pMgmt != NULL) {
    (*pWrapper->func.closeFp)(pWrapper->pMgmt);
    pWrapper->pMgmt = NULL;
  }
192
  taosThreadRwlockUnlock(&pWrapper->lock);
S
Shengliang Guan 已提交
193

194
  if (!OnlyInSingleProc(pWrapper)) {
S
Shengliang Guan 已提交
195
    dmCleanupProc(pWrapper);
S
shm  
Shengliang Guan 已提交
196
  }
S
shm  
Shengliang Guan 已提交
197

S
Shengliang Guan 已提交
198
  dInfo("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
199 200
}

S
Shengliang Guan 已提交
201
static int32_t dmOpenNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
202 203 204
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
205
    if (dmOpenNode(pWrapper) != 0) {
S
Shengliang Guan 已提交
206
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
Shengliang Guan 已提交
207
      return -1;
S
Shengliang Guan 已提交
208
    }
S
shm  
Shengliang Guan 已提交
209 210
  }

S
Shengliang Guan 已提交
211
  dmSetStatus(pDnode, DND_STAT_RUNNING);
S
Shengliang Guan 已提交
212 213
  return 0;
}
S
shm  
Shengliang Guan 已提交
214

S
Shengliang Guan 已提交
215
static int32_t dmStartNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
216 217
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
218
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
219
    if (dmStartNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
220 221 222 223 224 225
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
226
  dmReportStartup("TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
227 228 229
  return 0;
}

S
Shengliang Guan 已提交
230 231
static void dmStopNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
232
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
233
    dmStopNode(pWrapper);
S
shm  
Shengliang Guan 已提交
234
  }
S
Shengliang Guan 已提交
235
}
S
shm  
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237 238
static void dmCloseNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
239
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
240
    dmCloseNode(pWrapper);
S
shm  
Shengliang Guan 已提交
241
  }
S
Shengliang Guan 已提交
242
}
S
shm  
Shengliang Guan 已提交
243

S
Shengliang Guan 已提交
244
static void dmWatchNodes(SDnode *pDnode) {
245
  if (pDnode->ptype != PARENT_PROC) return;
S
Shengliang Guan 已提交
246
  if (pDnode->rtype == NODE_END) return;
247

248
  taosThreadMutexLock(&pDnode->mutex);
S
Shengliang Guan 已提交
249 250
  for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
251 252
    SProc        *proc = &pWrapper->proc;

253
    if (!pWrapper->required) continue;
254
    if (!OnlyInParentProc(pWrapper)) continue;
S
Shengliang Guan 已提交
255 256

    if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
S
Shengliang Guan 已提交
257
      dError("node:%s, pid:%d is killed and needs to restart", pWrapper->name, proc->pid);
S
Shengliang Guan 已提交
258
      dmCloseProcRpcHandles(&pWrapper->proc);
259
      dmNewProc(pWrapper, ntype);
S
shm  
Shengliang Guan 已提交
260 261
    }
  }
262
  taosThreadMutexUnlock(&pDnode->mutex);
S
shm  
Shengliang Guan 已提交
263
}
S
shm  
Shengliang Guan 已提交
264

265
int32_t dmRunDnode(SDnode *pDnode) {
wafwerar's avatar
wafwerar 已提交
266
  int count = 0;
S
Shengliang Guan 已提交
267 268
  if (dmOpenNodes(pDnode) != 0) {
    dError("failed to open nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
269 270
    return -1;
  }
S
shm  
Shengliang Guan 已提交
271

S
Shengliang Guan 已提交
272 273
  if (dmStartNodes(pDnode) != 0) {
    dError("failed to start nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
274
    return -1;
S
shm  
Shengliang Guan 已提交
275
  }
S
shm  
Shengliang Guan 已提交
276
  while (1) {
277
    if (pDnode->stop) {
S
Shengliang Guan 已提交
278
      dInfo("TDengine is about to stop");
S
Shengliang Guan 已提交
279
      dmSetStatus(pDnode, DND_STAT_STOPPED);
S
Shengliang Guan 已提交
280 281 282
      dmStopNodes(pDnode);
      dmCloseNodes(pDnode);
      return 0;
S
shm  
Shengliang Guan 已提交
283
    }
S
Shengliang Guan 已提交
284 285

    dmWatchNodes(pDnode);
wafwerar's avatar
wafwerar 已提交
286 287 288
    if (count == 0) osUpdate();
    count %= 10;
    count++;
S
Shengliang Guan 已提交
289
    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
290
  }
S
shm  
Shengliang Guan 已提交
291
}