dnodeTransport.c 13.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* this file is mainly responsible for the communication between DNODEs. Each 
17
 * dnode works as both server and client. Dnode may send status, grant, config
18 19 20 21 22
 * messages to mnode, mnode may send create/alter/drop table/vnode messages 
 * to dnode. All theses messages are handled from here
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
23 24 25 26
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
S
Shengliang Guan 已提交
27

28
static struct {
S
Shengliang Guan 已提交
29
  void *peerRpc;
S
Shengliang Guan 已提交
30
  void *shellRpc;
S
Shengliang Guan 已提交
31
  void *clientRpc;
S
Shengliang Guan 已提交
32
  MsgFp msgFp[TSDB_MSG_TYPE_MAX];
33 34
} tsTrans;

S
Shengliang Guan 已提交
35 36
static void dnodeInitMsgFp() {
  // msg from client to dnode
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
54 55

  // msg from client to mnode
S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
87 88

  // message from mnode to dnode
S
Shengliang Guan 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
111 112

  // message from dnode to mnode
S
Shengliang Guan 已提交
113 114 115 116 117 118
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
119 120
}

S
Shengliang Guan 已提交
121
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
122
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
123
  int32_t msgType = pMsg->msgType;
124

S
Shengliang Guan 已提交
125
  if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
S
Shengliang Guan 已提交
126
    dnodeProcessDnodeMsg(pMsg, pEpSet);
127 128 129
    return;
  }

S
Shengliang Guan 已提交
130
  if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
131 132 133
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
134
    dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
135 136 137 138 139 140 141 142 143
    return;
  }

  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
144
  MsgFp fp = tsTrans.msgFp[msgType];
145
  if (fp != NULL) {
S
Shengliang Guan 已提交
146
    dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
147
    (*fp)(pMsg, pEpSet);
148
  } else {
S
Shengliang Guan 已提交
149
    dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
150 151 152 153 154 155
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
156
static int32_t dnodeInitPeerServer() {
157 158 159 160 161
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = tsDnodeDnodePort;
  rpcInit.label = "DND-S";
  rpcInit.numOfThreads = 1;
162
  rpcInit.cfp = dnodeProcessPeerReq;
163 164 165 166
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;

S
Shengliang Guan 已提交
167 168
  tsTrans.peerRpc = rpcOpen(&rpcInit);
  if (tsTrans.peerRpc == NULL) {
169 170 171 172 173 174 175 176
    dError("failed to init peer rpc server");
    return -1;
  }

  dInfo("dnode peer rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
177 178 179 180
static void dnodeCleanupPeerServer() {
  if (tsTrans.peerRpc) {
    rpcClose(tsTrans.peerRpc);
    tsTrans.peerRpc = NULL;
181 182 183 184
    dInfo("dnode peer server is closed");
  }
}

S
Shengliang Guan 已提交
185
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
186 187 188
  int32_t msgType = pMsg->msgType;

  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
189
    if (pMsg == NULL || pMsg->pCont == NULL) return;
S
Shengliang Guan 已提交
190
    dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
191 192 193 194
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
195
  MsgFp fp = tsTrans.msgFp[msgType];
196
  if (fp != NULL) {
S
Shengliang Guan 已提交
197 198
    dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
    (*fp)(pMsg, pEpSet);
199
  } else {
S
Shengliang Guan 已提交
200
    dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
201 202 203 204 205
  }

  rpcFreeCont(pMsg->pCont);
}

S
Shengliang Guan 已提交
206
static int32_t dnodeInitClient() {
S
Shengliang Guan 已提交
207 208
  char secret[TSDB_KEY_LEN] = "secret";

209 210
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
211
  rpcInit.label = "DND-C";
212
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
213 214 215 216 217 218 219
  rpcInit.cfp = dnodeProcessPeerRsp;
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "t";
  rpcInit.ckey = "key";
  rpcInit.secret = secret;
220

221 222
  tsTrans.clientRpc = rpcOpen(&rpcInit);
  if (tsTrans.clientRpc == NULL) {
223 224 225 226 227 228 229 230
    dError("failed to init peer rpc client");
    return -1;
  }

  dInfo("dnode peer rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
231
static void dnodeCleanupClient() {
232 233 234
  if (tsTrans.clientRpc) {
    rpcClose(tsTrans.clientRpc);
    tsTrans.clientRpc = NULL;
235 236 237 238
    dInfo("dnode peer rpc client is closed");
  }
}

S
Shengliang Guan 已提交
239
static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
240
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
241
  int32_t msgType = pMsg->msgType;
242

S
Shengliang Guan 已提交
243 244 245 246
  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
    dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_EXITING;
    rpcSendResponse(&rspMsg);
247 248
    rpcFreeCont(pMsg->pCont);
    return;
S
Shengliang Guan 已提交
249 250 251 252
  } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
    dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
253 254 255 256
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
257 258 259 260 261 262
  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
263
  MsgFp fp = tsTrans.msgFp[msgType];
264
  if (fp != NULL) {
S
Shengliang Guan 已提交
265
    dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
266
    (*fp)(pMsg, pEpSet);
267
  } else {
S
Shengliang Guan 已提交
268 269 270
    dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
271 272 273 274
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
275
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
S
Shengliang Guan 已提交
276
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
277
  dnodeGetMnodeEpSetForPeer(&epSet);
278
  rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
279 280
}

281
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
282
  int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey);
283 284 285 286 287 288
  if (code != TSDB_CODE_APP_NOT_READY) return code;

  SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));

  dDebug("user:%s, send auth msg to mnodes", user);
S
Shengliang Guan 已提交
289
  SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
290
  SRpcMsg rpcRsp = {0};
291
  dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
292 293 294 295 296

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    dDebug("user:%s, auth msg received from mnodes", user);
S
Shengliang Guan 已提交
297
    SAuthRsp *pRsp = rpcRsp.pCont;
298 299 300 301 302 303 304 305 306 307
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
308
static int32_t dnodeInitShellServer() {
309 310 311 312 313 314 315
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
316 317
  rpcInit.localPort = tsDnodeShellPort;
  rpcInit.label = "SHELL";
318
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
319 320 321 322 323
  rpcInit.cfp = dnodeProcessShellReq;
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.afp = dnodeRetrieveUserAuthInfo;
324

325 326
  tsTrans.shellRpc = rpcOpen(&rpcInit);
  if (tsTrans.shellRpc == NULL) {
327 328 329 330 331 332 333 334
    dError("failed to init shell rpc server");
    return -1;
  }

  dInfo("dnode shell rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
335
static void dnodeCleanupShellServer() {
336 337 338
  if (tsTrans.shellRpc) {
    rpcClose(tsTrans.shellRpc);
    tsTrans.shellRpc = NULL;
339 340 341
  }
}

342 343
int32_t dnodeInitTrans() {
  if (dnodeInitClient() != 0) {
344 345 346
    return -1;
  }

S
Shengliang Guan 已提交
347
  if (dnodeInitPeerServer() != 0) {
348 349 350
    return -1;
  }

S
Shengliang Guan 已提交
351
  if (dnodeInitShellServer() != 0) {
352 353 354 355 356 357
    return -1;
  }

  return 0;
}

358
void dnodeCleanupTrans() {
S
Shengliang Guan 已提交
359 360
  dnodeCleanupShellServer();
  dnodeCleanupPeerServer();
361
  dnodeCleanupClient();
362
}
S
Shengliang Guan 已提交
363

S
Shengliang Guan 已提交
364
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
S
Shengliang Guan 已提交
365 366

void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
S
Shengliang Guan 已提交
367
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
368 369 370
  dnodeGetMnodeEpSetForPeer(&epSet);
  dnodeSendMsgToDnode(&epSet, rpcMsg);
}