dndNode.c 13.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
#include "bmInt.h"
S
shm  
Shengliang Guan 已提交
20
#include "dm.h"
S
shm  
Shengliang Guan 已提交
21
#include "mm.h"
S
shm  
Shengliang Guan 已提交
22 23 24
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
S
Shengliang Guan 已提交
25 26 27 28 29 30 31 32 33 34

static void dndResetLog(SMgmtWrapper *pMgmt) {
  char logname[24] = {0};
  snprintf(logname, sizeof(logname), "%slog", pMgmt->name);

  dInfo("node:%s, reset log to %s", pMgmt->name, logname);
  taosCloseLog();
  taosInitLog(logname, 1);
}

S
shm  
Shengliang Guan 已提交
35
static bool dndRequireNode(SMgmtWrapper *pMgmt) {
S
Shengliang Guan 已提交
36 37
  bool required = (*pMgmt->fp.requiredFp)(pMgmt);
  if (!required) {
S
shm  
Shengliang Guan 已提交
38
    dDebug("node:%s, no need to start", pMgmt->name);
S
Shengliang Guan 已提交
39
  } else {
S
shm  
Shengliang Guan 已提交
40
    dDebug("node:%s, need to start", pMgmt->name);
S
Shengliang Guan 已提交
41
  }
S
Shengliang Guan 已提交
42 43
  return required;
}
S
Shengliang Guan 已提交
44

S
shm  
Shengliang Guan 已提交
45 46
static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }

S
shm  
Shengliang Guan 已提交
47 48 49
static void dndCloseNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->required) {
    (*pWrapper->fp.closeFp)(pWrapper);
S
shm  
Shengliang Guan 已提交
50
    pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
51
  }
S
shm  
Shengliang Guan 已提交
52 53 54 55
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
shm  
Shengliang Guan 已提交
56
}
S
shm  
Shengliang Guan 已提交
57

S
shm  
Shengliang Guan 已提交
58 59 60 61 62 63 64 65 66 67 68 69
static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
  pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
  pDnode->serverPort = pOption->serverPort;
  pDnode->dataDir = strdup(pOption->dataDir);
  pDnode->localEp = strdup(pOption->localEp);
  pDnode->localFqdn = strdup(pOption->localFqdn);
  pDnode->firstEp = strdup(pOption->firstEp);
  pDnode->secondEp = strdup(pOption->secondEp);
  pDnode->pDisks = pOption->pDisks;
  pDnode->numOfDisks = pOption->numOfDisks;
  pDnode->rebootTime = taosGetTimestampMs();

S
shm  
Shengliang Guan 已提交
70 71
  if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
      pDnode->secondEp == NULL) {
S
shm  
Shengliang Guan 已提交
72 73 74 75 76 77
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

S
shm  
Shengliang Guan 已提交
78
static void dndClearMemory(SDnode *pDnode) {
S
Shengliang Guan 已提交
79
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
80
    SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
81
    tfree(pMgmt->path);
S
Shengliang Guan 已提交
82
  }
S
Shengliang Guan 已提交
83 84 85
  if (pDnode->pLockFile != NULL) {
    taosUnLockFile(pDnode->pLockFile);
    taosCloseFile(&pDnode->pLockFile);
S
shm  
Shengliang Guan 已提交
86
    pDnode->pLockFile = NULL;
S
Shengliang Guan 已提交
87
  }
S
shm  
Shengliang Guan 已提交
88 89 90 91 92 93
  tfree(pDnode->localEp);
  tfree(pDnode->localFqdn);
  tfree(pDnode->firstEp);
  tfree(pDnode->secondEp);
  tfree(pDnode->dataDir);
  free(pDnode);
S
Shengliang Guan 已提交
94 95
  dDebug("dnode object memory is cleared, data:%p", pDnode);
}
S
Shengliang Guan 已提交
96

S
shm  
Shengliang Guan 已提交
97
SDnode *dndCreate(const SDnodeOpt *pOption) {
S
Shengliang Guan 已提交
98
  dInfo("start to create dnode object");
S
Shengliang Guan 已提交
99 100 101
  int32_t code = -1;
  char    path[PATH_MAX + 100];
  SDnode *pDnode = NULL;
S
Shengliang Guan 已提交
102

S
Shengliang Guan 已提交
103
  pDnode = calloc(1, sizeof(SDnode));
S
Shengliang Guan 已提交
104 105
  if (pDnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
106
    goto _OVER;
S
Shengliang Guan 已提交
107 108
  }

S
shm  
Shengliang Guan 已提交
109 110 111 112
  if (dndInitMemory(pDnode, pOption) != 0) {
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
113
  dndSetStatus(pDnode, DND_STAT_INIT);
S
shm  
Shengliang Guan 已提交
114
  pDnode->pLockFile = dndCheckRunning(pDnode->dataDir);
S
shm  
Shengliang Guan 已提交
115 116 117 118 119 120 121 122
  if (pDnode->pLockFile == NULL) {
    goto _OVER;
  }

  if (dndInitServer(pDnode) != 0) {
    dError("failed to init trans server since %s", terrstr());
    goto _OVER;
  }
S
shm  
Shengliang Guan 已提交
123

S
shm  
Shengliang Guan 已提交
124 125
  if (dndInitClient(pDnode) != 0) {
    dError("failed to init trans client since %s", terrstr());
S
shm  
Shengliang Guan 已提交
126 127 128
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
129 130 131 132 133 134
  dmGetMgmtFp(&pDnode->wrappers[DNODE]);
  mmGetMgmtFp(&pDnode->wrappers[MNODE]);
  vmGetMgmtFp(&pDnode->wrappers[VNODES]);
  qmGetMgmtFp(&pDnode->wrappers[QNODE]);
  smGetMgmtFp(&pDnode->wrappers[SNODE]);
  bmGetMgmtFp(&pDnode->wrappers[BNODE]);
S
Shengliang Guan 已提交
135

S
shm  
Shengliang Guan 已提交
136
  if (dndInitMsgHandle(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
137 138 139
    goto _OVER;
  }

S
Shengliang Guan 已提交
140
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
141
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
142
    snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
S
shm  
Shengliang Guan 已提交
143
    pWrapper->path = strdup(path);
S
shm  
Shengliang Guan 已提交
144
    pWrapper->pDnode = pDnode;
S
shm  
Shengliang Guan 已提交
145
    if (pWrapper->path == NULL) {
S
Shengliang Guan 已提交
146 147 148 149
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

S
shm  
Shengliang Guan 已提交
150
    pWrapper->procType = PROC_SINGLE;
S
Shengliang Guan 已提交
151 152
  }

S
shm  
Shengliang Guan 已提交
153 154
  code = 0;

S
Shengliang Guan 已提交
155 156
_OVER:
  if (code != 0 && pDnode) {
S
shm  
Shengliang Guan 已提交
157
    dndClearMemory(pDnode);
S
Shengliang Guan 已提交
158 159 160
    dError("failed to create dnode object since %s", terrstr());
  } else {
    dInfo("dnode object is created, data:%p", pDnode);
S
Shengliang Guan 已提交
161 162
  }

S
Shengliang Guan 已提交
163 164 165
  return pDnode;
}

S
Shengliang Guan 已提交
166
void dndClose(SDnode *pDnode) {
S
Shengliang Guan 已提交
167 168
  if (pDnode == NULL) return;

S
shm  
Shengliang Guan 已提交
169
  if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
170
    dError("dnode is shutting down, data:%p", pDnode);
S
Shengliang Guan 已提交
171 172 173
    return;
  }

S
Shengliang Guan 已提交
174
  dInfo("start to close dnode, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
175
  dndSetStatus(pDnode, DND_STAT_STOPPED);
176

S
shm  
Shengliang Guan 已提交
177 178 179 180 181 182 183 184
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }

S
shm  
Shengliang Guan 已提交
185
  dndClearMemory(pDnode);
S
Shengliang Guan 已提交
186
  dInfo("dnode object is closed, data:%p", pDnode);
S
Shengliang Guan 已提交
187
}
S
Shengliang Guan 已提交
188

S
shm  
Shengliang Guan 已提交
189 190
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
  dInfo("dnode run in single process mode");
S
shm  
Shengliang Guan 已提交
191

S
shm  
Shengliang Guan 已提交
192 193
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
194
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
195 196
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
197 198 199 200 201 202
    if (taosMkDir(pWrapper->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
      return -1;
    }

S
shm  
Shengliang Guan 已提交
203
    dInfo("node:%s, will start in single process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
204
    pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
205
    if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
206 207 208 209
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }
S
shm  
Shengliang Guan 已提交
210

S
shm  
Shengliang Guan 已提交
211
  SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
S
shm  
Shengliang Guan 已提交
212
  if (dmStart(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
213 214 215 216
    dError("failed to start dnode worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
217 218 219
  return 0;
}

S
shm  
Shengliang Guan 已提交
220 221 222 223 224 225 226 227 228
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
  dndCleanupServer(pDnode);
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    if (except == n) continue;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }
}

S
shm  
Shengliang Guan 已提交
229 230
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
S
shm  
Shengliang Guan 已提交
231
    dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp);
S
shm  
Shengliang Guan 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
  } else {
    rpcSendResponse(pRsp);
  }
}

void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  int32_t code = -1;

  if (pWrapper->procType != PROC_CHILD) {
    dndSendRpcRsp(pWrapper, pRsp);
  } else {
    do {
      code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
      if (code != 0) {
        taosMsleep(10);
      }
    } while (code != 0);
  }
}

S
shm  
Shengliang Guan 已提交
252 253 254 255 256
void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  pRsp->code = TSDB_CODE_APP_NOT_READY;
  dndSendRsp(pWrapper, pRsp);
}

S
shm  
Shengliang Guan 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
  dTrace("msg:%p, get from child queue", pMsg);
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) {
  dTrace("msg:%p, get from parent queue", pRsp);
  pRsp->pCont = pCont;
  dndSendRpcRsp(pWrapper, pRsp);
  free(pRsp);
}

S
shm  
Shengliang Guan 已提交
285
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
286 287
  dInfo("dnode run in multi process mode");

S
shm  
Shengliang Guan 已提交
288 289
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
290
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
291 292
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
293 294 295 296 297 298
    if (taosMkDir(pWrapper->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
      return -1;
    }

S
shm  
Shengliang Guan 已提交
299 300
    if (n == DNODE) {
      dInfo("node:%s, will start in parent process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
301
      pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
302
      if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
303 304 305 306 307 308
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
      continue;
    }

S
shm  
Shengliang Guan 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    SProcCfg  cfg = {.childQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                     .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                     .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                     .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .parentQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                     .parentdMallocHeadFp = (ProcMallocFp)malloc,
                     .parentFreeHeadFp = (ProcFreeFp)free,
                     .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .testFlag = 0,
                     .pParent = pWrapper,
                     .name = pWrapper->name};
S
shm  
Shengliang Guan 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337
    SProcObj *pProc = taosProcInit(&cfg);
    if (pProc == NULL) {
      dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
      return -1;
    }

    pWrapper->pProc = pProc;

    if (taosProcIsChild(pProc)) {
      dInfo("node:%s, will start in child process", pWrapper->name);
      pWrapper->procType = PROC_CHILD;
      dndResetLog(pWrapper);

      dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
S
shm  
Shengliang Guan 已提交
338
      dndClearNodesExecpt(pDnode, n);
S
shm  
Shengliang Guan 已提交
339 340

      dInfo("node:%s, will be initialized in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
341
      dndOpenNode(pWrapper);
S
shm  
Shengliang Guan 已提交
342 343 344 345
    } else {
      dInfo("node:%s, will not start in parent process", pWrapper->name);
      pWrapper->procType = PROC_PARENT;
    }
S
shm  
Shengliang Guan 已提交
346 347 348 349 350

    if (taosProcRun(pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
351 352
  }

S
shm  
Shengliang Guan 已提交
353
  SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
S
shm  
Shengliang Guan 已提交
354
  if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
S
shm  
Shengliang Guan 已提交
355 356 357 358
    dError("failed to start dnode worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
  return 0;
}

int32_t dndRun(SDnode *pDnode) {
  if (tsMultiProcess == 0) {
    if (dndRunInSingleProcess(pDnode) != 0) {
      dError("failed to run dnode in single process mode since %s", terrstr());
      return -1;
    }
  } else {
    if (dndRunInMultiProcess(pDnode) != 0) {
      dError("failed to run dnode in multi process mode since %s", terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
375 376 377
  dndSetStatus(pDnode, DND_STAT_RUNNING);
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

S
shm  
Shengliang Guan 已提交
378
  while (1) {
S
shm  
Shengliang Guan 已提交
379
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
380
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
381 382
      break;
    }
S
shm  
Shengliang Guan 已提交
383 384
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
385 386

  return 0;
S
shm  
Shengliang Guan 已提交
387
}
S
shm  
Shengliang Guan 已提交
388

S
shm  
Shengliang Guan 已提交
389
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
S
shm  
Shengliang Guan 已提交
390 391 392 393
  dInfo("dnode object receive event %d, data:%p", event, pDnode);
  pDnode->event = event;
}

S
shm  
Shengliang Guan 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->rpcMsg = *pRpc;

  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
S
shm  
Shengliang Guan 已提交
410
    dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE), pEpSet);
S
shm  
Shengliang Guan 已提交
411
  }
S
shm  
Shengliang Guan 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430

  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    goto _OVER;
  }

  pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg == NULL) {
    goto _OVER;
  }

  if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
431
  dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
432 433 434 435 436 437 438 439 440

  if (pWrapper->procType == PROC_SINGLE) {
    code = (*msgFp)(pWrapper, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
    code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
  } else {
    terrno = TSDB_CODE_MEMORY_CORRUPTED;
    dError("msg:%p, won't be processed for it is child process", pMsg);
  }
S
shm  
Shengliang Guan 已提交
441 442 443

_OVER:

S
shm  
Shengliang Guan 已提交
444 445 446 447 448 449 450
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
      dTrace("msg:%p, is freed", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
shm  
Shengliang Guan 已提交
451
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
shm  
Shengliang Guan 已提交
452 453 454
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
S
shm  
Shengliang Guan 已提交
455
      dndSendRsp(pWrapper, &rsp);
S
shm  
Shengliang Guan 已提交
456 457 458 459 460
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }
S
shm  
Shengliang Guan 已提交
461
}