dnodeTransport.c 13.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* this file is mainly responsible for the communication between DNODEs. Each 
17
 * dnode works as both server and client. Dnode may send status, grant, config
18 19 20 21 22
 * messages to mnode, mnode may send create/alter/drop table/vnode messages 
 * to dnode. All theses messages are handled from here
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
23 24 25 26
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
27
#include "mnode.h"
S
Shengliang Guan 已提交
28

29
static struct {
S
Shengliang Guan 已提交
30
  void *peerRpc;
S
Shengliang Guan 已提交
31
  void *shellRpc;
S
Shengliang Guan 已提交
32
  void *clientRpc;
S
Shengliang Guan 已提交
33
  MsgFp msgFp[TSDB_MSG_TYPE_MAX];
34 35
} tsTrans;

S
Shengliang Guan 已提交
36 37
static void dnodeInitMsgFp() {
  // msg from client to dnode
S
Shengliang Guan 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
  tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
55 56

  // msg from client to mnode
S
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
88 89

  // message from mnode to dnode
S
Shengliang Guan 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
112 113

  // message from dnode to mnode
S
Shengliang Guan 已提交
114 115 116 117 118 119
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
120 121
}

122
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
S
Shengliang Guan 已提交
123
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
124
  int32_t msgType = pMsg->msgType;
125

S
Shengliang Guan 已提交
126
  if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
S
Shengliang Guan 已提交
127
    dnodeProcessDnodeMsg(pMsg, pEpSet);
128 129 130
    return;
  }

S
Shengliang Guan 已提交
131
  if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
132 133 134
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
135
    dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
136 137 138 139 140 141 142 143 144
    return;
  }

  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
145
  MsgFp fp = tsTrans.msgFp[msgType];
146
  if (fp != NULL) {
S
Shengliang Guan 已提交
147
    dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
148
    (*fp)(pMsg, pEpSet);
149
  } else {
S
Shengliang Guan 已提交
150
    dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
151 152 153 154 155 156
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
157
static int32_t dnodeInitPeerServer() {
158 159 160 161 162
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = tsDnodeDnodePort;
  rpcInit.label = "DND-S";
  rpcInit.numOfThreads = 1;
163
  rpcInit.cfp = dnodeProcessPeerReq;
164 165 166 167
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;

S
Shengliang Guan 已提交
168 169
  tsTrans.peerRpc = rpcOpen(&rpcInit);
  if (tsTrans.peerRpc == NULL) {
170 171 172 173 174 175 176 177
    dError("failed to init peer rpc server");
    return -1;
  }

  dInfo("dnode peer rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
178 179 180 181
static void dnodeCleanupPeerServer() {
  if (tsTrans.peerRpc) {
    rpcClose(tsTrans.peerRpc);
    tsTrans.peerRpc = NULL;
182 183 184 185
    dInfo("dnode peer server is closed");
  }
}

S
Shengliang Guan 已提交
186 187 188 189
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
  int32_t msgType = pMsg->msgType;

  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
190
    if (pMsg == NULL || pMsg->pCont == NULL) return;
S
Shengliang Guan 已提交
191
    dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
192 193 194 195
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
196
  MsgFp fp = tsTrans.msgFp[msgType];
197
  if (fp != NULL) {
S
Shengliang Guan 已提交
198 199
    dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
    (*fp)(pMsg, pEpSet);
200
  } else {
S
Shengliang Guan 已提交
201
    dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
202 203 204 205 206
  }

  rpcFreeCont(pMsg->pCont);
}

S
Shengliang Guan 已提交
207
static int32_t dnodeInitClient() {
S
Shengliang Guan 已提交
208 209
  char secret[TSDB_KEY_LEN] = "secret";

210 211
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
212
  rpcInit.label = "DND-C";
213
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
214 215 216 217 218 219 220
  rpcInit.cfp = dnodeProcessPeerRsp;
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "t";
  rpcInit.ckey = "key";
  rpcInit.secret = secret;
221

222 223
  tsTrans.clientRpc = rpcOpen(&rpcInit);
  if (tsTrans.clientRpc == NULL) {
224 225 226 227 228 229 230 231
    dError("failed to init peer rpc client");
    return -1;
  }

  dInfo("dnode peer rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
232
static void dnodeCleanupClient() {
233 234 235
  if (tsTrans.clientRpc) {
    rpcClose(tsTrans.clientRpc);
    tsTrans.clientRpc = NULL;
236 237 238 239
    dInfo("dnode peer rpc client is closed");
  }
}

S
Shengliang Guan 已提交
240
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
S
Shengliang Guan 已提交
241
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
242
  int32_t msgType = pMsg->msgType;
243

S
Shengliang Guan 已提交
244 245 246 247
  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
    dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_EXITING;
    rpcSendResponse(&rspMsg);
248 249
    rpcFreeCont(pMsg->pCont);
    return;
S
Shengliang Guan 已提交
250 251 252 253
  } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
    dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
254 255 256 257
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
258 259 260 261 262 263
  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
264
  MsgFp fp = tsTrans.msgFp[msgType];
265
  if (fp != NULL) {
S
Shengliang Guan 已提交
266
    dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
267
    (*fp)(pMsg, pEpSet);
268
  } else {
S
Shengliang Guan 已提交
269 270 271
    dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
272 273 274 275
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
276
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
277
  SRpcEpSet epSet = {0};
S
Shengliang Guan 已提交
278
  dnodeGetMnodeEpSetForPeer(&epSet);
279
  rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
280 281
}

282 283
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
  int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
284 285 286 287 288 289
  if (code != TSDB_CODE_APP_NOT_READY) return code;

  SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));

  dDebug("user:%s, send auth msg to mnodes", user);
S
Shengliang Guan 已提交
290
  SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
291
  SRpcMsg rpcRsp = {0};
292
  dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
293 294 295 296 297

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    dDebug("user:%s, auth msg received from mnodes", user);
S
Shengliang Guan 已提交
298
    SAuthRsp *pRsp = rpcRsp.pCont;
299 300 301 302 303 304 305 306 307 308
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
309
static int32_t dnodeInitShellServer() {
310 311 312 313 314 315 316
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
317 318
  rpcInit.localPort = tsDnodeShellPort;
  rpcInit.label = "SHELL";
319
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
320 321 322 323 324
  rpcInit.cfp = dnodeProcessShellReq;
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.afp = dnodeRetrieveUserAuthInfo;
325

326 327
  tsTrans.shellRpc = rpcOpen(&rpcInit);
  if (tsTrans.shellRpc == NULL) {
328 329 330 331 332 333 334 335
    dError("failed to init shell rpc server");
    return -1;
  }

  dInfo("dnode shell rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
336
static void dnodeCleanupShellServer() {
337 338 339
  if (tsTrans.shellRpc) {
    rpcClose(tsTrans.shellRpc);
    tsTrans.shellRpc = NULL;
340 341 342
  }
}

343 344
int32_t dnodeInitTrans() {
  if (dnodeInitClient() != 0) {
345 346 347
    return -1;
  }

S
Shengliang Guan 已提交
348
  if (dnodeInitPeerServer() != 0) {
349 350 351
    return -1;
  }

S
Shengliang Guan 已提交
352
  if (dnodeInitShellServer() != 0) {
353 354 355 356 357 358
    return -1;
  }

  return 0;
}

359
void dnodeCleanupTrans() {
S
Shengliang Guan 已提交
360 361
  dnodeCleanupShellServer();
  dnodeCleanupPeerServer();
362
  dnodeCleanupClient();
363
}
S
Shengliang Guan 已提交
364 365 366 367 368 369 370 371

void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }

void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
  SRpcEpSet epSet = {0};
  dnodeGetMnodeEpSetForPeer(&epSet);
  dnodeSendMsgToDnode(&epSet, rpcMsg);
}