dnodeTrans.c 14.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* this file is mainly responsible for the communication between DNODEs. Each 
S
Shengliang Guan 已提交
17
 * dnode works as both server and client. SDnode may send status, grant, config
18 19 20 21 22 23 24 25 26 27 28 29 30 31
 * messages to mnode, mnode may send create/alter/drop table/vnode messages 
 * to dnode. All theses messages are handled from here
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "dnodeMain.h"
#include "dnodeMnodeEps.h"
#include "dnodeStatus.h"
#include "dnodeTrans.h"
#include "vnode.h"
#include "mnode.h"

32
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
S
Shengliang Guan 已提交
33
  SDnode * dnode = dnodeInst();
34 35 36 37
  SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};

  if (pMsg->pCont == NULL) return;
  if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
38
    dnodeProcessStartupReq(pMsg);
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
    return;
  }

  if (dnode->main->runStatus != TD_RUN_STAT_RUNNING) {
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
    dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
    return;
  }

  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

56 57 58
  RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType];
  if (fp != NULL) {
    (*fp)(pMsg);
59 60 61 62 63 64 65 66
  } else {
    dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
67
int32_t dnodeInitServer(SDnTrans *trans) {
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE]  = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]    = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]   = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]   = vnodeProcessMsg;

  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE]  = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE]   = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE]    = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE]    = vnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM]  = vnodeProcessMsg;

  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE]  = dnodeProcessConfigDnodeReq;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE]  = dnodeProcessCreateMnodeReq;

  trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH]          = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT]         = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS]        = mnodeProcessMsg;
88 89 90 91 92 93

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = tsDnodeDnodePort;
  rpcInit.label = "DND-S";
  rpcInit.numOfThreads = 1;
94
  rpcInit.cfp = dnodeProcessPeerReq;
95 96 97 98 99 100 101 102 103 104 105 106 107 108
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;

  trans->serverRpc = rpcOpen(&rpcInit);
  if (trans->serverRpc == NULL) {
    dError("failed to init peer rpc server");
    return -1;
  }

  dInfo("dnode peer rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
109
void dnodeCleanupServer(SDnTrans *trans) {
110 111 112 113 114 115 116
  if (trans->serverRpc) {
    rpcClose(trans->serverRpc);
    trans->serverRpc = NULL;
    dInfo("dnode peer server is closed");
  }
}

117
static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
S
Shengliang Guan 已提交
118
  SDnode *dnode = dnodeInst();
119 120 121 122 123 124 125 126 127 128 129
  if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) {
    if (pMsg == NULL || pMsg->pCont == NULL) return;
    dTrace("msg:%p is ignored since dnode is stopping", pMsg);
    rpcFreeCont(pMsg->pCont);
    return;
  }

  if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
    dnodeUpdateMnodeFromPeer(dnode->meps, pEpSet);
  }

130 131 132
  RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType];
  if (fp != NULL) {
    (*fp)(pMsg);
133 134 135 136 137 138 139 140 141 142 143
  } else {
    dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
    SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }

  rpcFreeCont(pMsg->pCont);
}

S
Shengliang Guan 已提交
144
int32_t dnodeInitClient(SDnTrans *trans) {
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP]    = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP]   = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP]   = mnodeProcessMsg;

  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP]   = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP]    = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP]    = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP]  = mnodeProcessMsg;

  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP]  = mnodeProcessMsg;

  trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP]  = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP]          = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP]         = mnodeProcessMsg;
  trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP]        = dnodeProcessStatusRsp;
165 166 167 168 169 170

  char secret[TSDB_KEY_LEN] = "secret";
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.label        = "DND-C";
  rpcInit.numOfThreads = 1;
171
  rpcInit.cfp          = dnodeProcessRspFromPeer;
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  rpcInit.sessions     = TSDB_MAX_VNODES << 4;
  rpcInit.connType     = TAOS_CONN_CLIENT;
  rpcInit.idleTime     = tsShellActivityTimer * 1000;
  rpcInit.user         = "t";
  rpcInit.ckey         = "key";
  rpcInit.secret       = secret;

  trans->clientRpc = rpcOpen(&rpcInit);
  if (trans->clientRpc == NULL) {
    dError("failed to init peer rpc client");
    return -1;
  }

  dInfo("dnode peer rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
189
void dnodeCleanupClient(SDnTrans *trans) {
190 191 192 193 194 195 196
  if (trans->clientRpc) {
    rpcClose(trans->clientRpc);
    trans->clientRpc = NULL;
    dInfo("dnode peer rpc client is closed");
  }
}

197
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
S
Shengliang Guan 已提交
198
  SDnode * dnode = dnodeInst();
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};

  if (pMsg->pCont == NULL) return;
  if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) {
    dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]);
    rpcMsg.code = TSDB_CODE_DND_EXITING;
    rpcSendResponse(&rpcMsg);
    rpcFreeCont(pMsg->pCont);
    return;
  } else if (dnode->main->runStatus != TD_RUN_STAT_RUNNING) {
    dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
    rpcMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rpcMsg);
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
216
  SDnTrans *trans = dnode->trans;
217 218 219 220 221 222
  if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
    atomic_fetch_add_32(&trans->queryReqNum, 1);
  } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
    atomic_fetch_add_32(&trans->submitReqNum, 1);
  } else {}

223 224 225
  RpcMsgFp fp = trans->shellMsgFp[pMsg->msgType];
  if (fp != NULL) {
    (*fp)(pMsg);
226 227 228 229 230 231 232 233 234 235 236
  } else {
    dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
    rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rpcMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  if (strcmp(user, "nettestinternal") == 0) {
    char pass[32] = {0};
237
    taosEncryptPass((uint8_t *)user, strlen(user), pass);
238 239 240 241 242 243 244 245 246 247 248
    *spi = 0;
    *encrypt = 0;
    *ckey = 0;
    memcpy(secret, pass, TSDB_KEY_LEN);
    dTrace("nettest user is authorized");
    return 0;
  }

  return -1;
}

249
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
S
Shengliang Guan 已提交
250
  SDnode *dnode = dnodeInst();
251 252 253
  rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL);
}

254
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
S
Shengliang Guan 已提交
255
  SDnode *   dnode = dnodeInst();
256 257
  SRpcEpSet epSet = {0};
  dnodeGetEpSetForPeer(dnode->meps, &epSet);
258
  dnodeSendMsgToDnode(&epSet, rpcMsg);
259 260
}

261
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
S
Shengliang Guan 已提交
262
  SDnode *   dnode = dnodeInst();
263 264 265 266 267
  SRpcEpSet epSet = {0};
  dnodeGetEpSetForPeer(dnode->meps, &epSet);
  rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp);
}

268
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
S
Shengliang Guan 已提交
269
  SDnode *dnode = dnodeInst();
270 271 272
  rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp);
}

273
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
274 275
  if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0;

276
  int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
277 278 279 280 281 282 283 284 285 286 287 288
  if (code != TSDB_CODE_APP_NOT_READY) return code;

  SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));

  SRpcMsg rpcMsg = {0};
  rpcMsg.pCont = pMsg;
  rpcMsg.contLen = sizeof(SAuthMsg);
  rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;

  dDebug("user:%s, send auth msg to mnodes", user);
  SRpcMsg rpcRsp = {0};
289
  dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    SAuthRsp *pRsp = rpcRsp.pCont;
    dDebug("user:%s, auth msg received from mnodes", user);
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
306
int32_t dnodeInitShell(SDnTrans *trans) {
307 308 309 310
  trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_QUERY]  = vnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_FETCH]  = vnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
311 312

  // the following message shall be treated as mnode write
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT]     = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER]     = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]    = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB]         = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB]         = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP]         = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION]   = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB]        = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP]        = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]    = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE]     = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]    = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM]     = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]    = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]   = mnodeProcessMsg;
339 340

  // the following message shall be treated as mnode query
341 342 343 344 345 346 347 348 349 350 351
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT]     = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT]       = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB]        = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META]    = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META]   = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_SHOW]          = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE]      = mnodeProcessMsg;
  trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg;

  trans->shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST]     = dnodeProcessStartupReq;
352 353 354 355 356 357 358 359 360 361 362

  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort    = tsDnodeShellPort;
  rpcInit.label        = "SHELL";
  rpcInit.numOfThreads = numOfThreads;
363
  rpcInit.cfp          = dnodeProcessMsgFromShell;
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
  rpcInit.sessions     = tsMaxShellConns;
  rpcInit.connType     = TAOS_CONN_SERVER;
  rpcInit.idleTime     = tsShellActivityTimer * 1000;
  rpcInit.afp          = dnodeRetrieveUserAuthInfo;

  trans->shellRpc = rpcOpen(&rpcInit);
  if (trans->shellRpc == NULL) {
    dError("failed to init shell rpc server");
    return -1;
  }

  dInfo("dnode shell rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
379
void dnodeCleanupShell(SDnTrans *trans) {
380 381 382 383 384 385
  if (trans->shellRpc) {
    rpcClose(trans->shellRpc);
    trans->shellRpc = NULL;
  }
}

S
Shengliang Guan 已提交
386 387
int32_t dnodeInitTrans(SDnTrans **out) {
  SDnTrans *trans = calloc(1, sizeof(SDnTrans));
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
  if (trans == NULL) return -1;

  *out = trans;

  if (dnodeInitClient(trans) != 0) {
    return -1;
  }

  if (dnodeInitServer(trans) != 0) {
    return -1;
  }

  if (dnodeInitShell(trans) != 0) {
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
407 408
void dnodeCleanupTrans(SDnTrans **out) {
  SDnTrans* trans = *out;
409 410 411 412 413 414 415 416
  *out = NULL;

  dnodeCleanupShell(trans);
  dnodeCleanupServer(trans);
  dnodeCleanupClient(trans);

  free(trans);
}