dnodeTransport.c 13.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* this file is mainly responsible for the communication between DNODEs. Each 
17
 * dnode works as both server and client. Dnode may send status, grant, config
18 19 20 21 22
 * messages to mnode, mnode may send create/alter/drop table/vnode messages 
 * to dnode. All theses messages are handled from here
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
23 24 25 26
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
S
Shengliang Guan 已提交
27

28
static struct {
S
Shengliang Guan 已提交
29
  void *peerRpc;
S
Shengliang Guan 已提交
30
  void *shellRpc;
S
Shengliang Guan 已提交
31
  void *clientRpc;
S
Shengliang Guan 已提交
32
  MsgFp msgFp[TSDB_MSG_TYPE_MAX];
33 34
} tsTrans;

S
Shengliang Guan 已提交
35 36
static void dnodeInitMsgFp() {
  // msg from client to dnode
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
  tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg;
  
S
Shengliang Guan 已提交
53
  // msg from client to mnode
S
Shengliang Guan 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
  tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
78
  tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
79 80 81 82 83 84 85
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
86

S
Shengliang Guan 已提交
87 88 89 90 91
  // message from client to dnode
  tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;

  // message from mnode to vnode
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg;
S
Shengliang Guan 已提交
92
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
93
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg;
S
Shengliang Guan 已提交
94
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
95 96 97 98 99
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg;

  // message from mnode to dnode
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
100
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
101
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
102
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
103
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
104
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
105
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
106
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
107 108 109
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
110 111 112 113 114 115 116
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg;
S
Shengliang Guan 已提交
117 118

  // message from dnode to mnode
S
Shengliang Guan 已提交
119 120 121 122 123 124
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
125 126
}

S
Shengliang Guan 已提交
127
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
128
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
129
  int32_t msgType = pMsg->msgType;
130

S
Shengliang Guan 已提交
131
  if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
S
Shengliang Guan 已提交
132
    dnodeProcessDnodeMsg(pMsg, pEpSet);
133 134 135
    return;
  }

S
Shengliang Guan 已提交
136
  if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
137 138 139
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
140
    dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
141 142 143 144 145 146 147 148 149
    return;
  }

  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
150
  MsgFp fp = tsTrans.msgFp[msgType];
151
  if (fp != NULL) {
S
Shengliang Guan 已提交
152
    dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
153
    (*fp)(pMsg, pEpSet);
154
  } else {
S
Shengliang Guan 已提交
155
    dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
156 157 158 159 160 161
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
162
static int32_t dnodeInitPeerServer() {
163 164 165 166 167
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = tsDnodeDnodePort;
  rpcInit.label = "DND-S";
  rpcInit.numOfThreads = 1;
168
  rpcInit.cfp = dnodeProcessPeerReq;
169 170 171 172
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;

S
Shengliang Guan 已提交
173 174
  tsTrans.peerRpc = rpcOpen(&rpcInit);
  if (tsTrans.peerRpc == NULL) {
175 176 177 178 179 180 181 182
    dError("failed to init peer rpc server");
    return -1;
  }

  dInfo("dnode peer rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
183 184 185 186
static void dnodeCleanupPeerServer() {
  if (tsTrans.peerRpc) {
    rpcClose(tsTrans.peerRpc);
    tsTrans.peerRpc = NULL;
187 188 189 190
    dInfo("dnode peer server is closed");
  }
}

S
Shengliang Guan 已提交
191
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
192 193 194
  int32_t msgType = pMsg->msgType;

  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
195
    if (pMsg == NULL || pMsg->pCont == NULL) return;
S
Shengliang Guan 已提交
196
    dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
197 198 199 200
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
201
  MsgFp fp = tsTrans.msgFp[msgType];
202
  if (fp != NULL) {
S
Shengliang Guan 已提交
203 204
    dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
    (*fp)(pMsg, pEpSet);
205
  } else {
S
Shengliang Guan 已提交
206
    dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
207 208 209 210 211
  }

  rpcFreeCont(pMsg->pCont);
}

S
Shengliang Guan 已提交
212
static int32_t dnodeInitClient() {
S
Shengliang Guan 已提交
213 214
  char secret[TSDB_KEY_LEN] = "secret";

215 216
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
217
  rpcInit.label = "DND-C";
218
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
219 220 221 222 223 224 225
  rpcInit.cfp = dnodeProcessPeerRsp;
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "t";
  rpcInit.ckey = "key";
  rpcInit.secret = secret;
226

227 228
  tsTrans.clientRpc = rpcOpen(&rpcInit);
  if (tsTrans.clientRpc == NULL) {
229 230 231 232 233 234 235 236
    dError("failed to init peer rpc client");
    return -1;
  }

  dInfo("dnode peer rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
237
static void dnodeCleanupClient() {
238 239 240
  if (tsTrans.clientRpc) {
    rpcClose(tsTrans.clientRpc);
    tsTrans.clientRpc = NULL;
241 242 243 244
    dInfo("dnode peer rpc client is closed");
  }
}

S
Shengliang Guan 已提交
245
static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
246
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
247
  int32_t msgType = pMsg->msgType;
248

S
Shengliang Guan 已提交
249 250 251 252
  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
    dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_EXITING;
    rpcSendResponse(&rspMsg);
253 254
    rpcFreeCont(pMsg->pCont);
    return;
S
Shengliang Guan 已提交
255 256 257 258
  } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
    dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
259 260 261 262
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
263 264 265 266 267 268
  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
269
  MsgFp fp = tsTrans.msgFp[msgType];
270
  if (fp != NULL) {
S
Shengliang Guan 已提交
271
    dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
272
    (*fp)(pMsg, pEpSet);
273
  } else {
S
Shengliang Guan 已提交
274 275 276
    dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
277 278 279 280
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
281
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
S
Shengliang Guan 已提交
282
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
283
  dnodeGetMnodeEpSetForPeer(&epSet);
284
  rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
285 286
}

287
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
288
  int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey);
289 290 291 292 293 294
  if (code != TSDB_CODE_APP_NOT_READY) return code;

  SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));

  dDebug("user:%s, send auth msg to mnodes", user);
S
Shengliang Guan 已提交
295
  SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
296
  SRpcMsg rpcRsp = {0};
297
  dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
298 299 300 301 302

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    dDebug("user:%s, auth msg received from mnodes", user);
S
Shengliang Guan 已提交
303
    SAuthRsp *pRsp = rpcRsp.pCont;
304 305 306 307 308 309 310 311 312 313
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
314
static int32_t dnodeInitShellServer() {
315 316 317 318 319 320 321
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
322 323
  rpcInit.localPort = tsDnodeShellPort;
  rpcInit.label = "SHELL";
324
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
325 326 327 328 329
  rpcInit.cfp = dnodeProcessShellReq;
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.afp = dnodeRetrieveUserAuthInfo;
330

331 332
  tsTrans.shellRpc = rpcOpen(&rpcInit);
  if (tsTrans.shellRpc == NULL) {
333 334 335 336 337 338 339 340
    dError("failed to init shell rpc server");
    return -1;
  }

  dInfo("dnode shell rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
341
static void dnodeCleanupShellServer() {
342 343 344
  if (tsTrans.shellRpc) {
    rpcClose(tsTrans.shellRpc);
    tsTrans.shellRpc = NULL;
345 346 347
  }
}

348 349
int32_t dnodeInitTrans() {
  if (dnodeInitClient() != 0) {
350 351 352
    return -1;
  }

S
Shengliang Guan 已提交
353
  if (dnodeInitPeerServer() != 0) {
354 355 356
    return -1;
  }

S
Shengliang Guan 已提交
357
  if (dnodeInitShellServer() != 0) {
358 359 360 361 362 363
    return -1;
  }

  return 0;
}

364
void dnodeCleanupTrans() {
S
Shengliang Guan 已提交
365 366
  dnodeCleanupShellServer();
  dnodeCleanupPeerServer();
367
  dnodeCleanupClient();
368
}
S
Shengliang Guan 已提交
369

S
Shengliang Guan 已提交
370
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
S
Shengliang Guan 已提交
371 372

void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
S
Shengliang Guan 已提交
373
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
374 375 376
  dnodeGetMnodeEpSetForPeer(&epSet);
  dnodeSendMsgToDnode(&epSet, rpcMsg);
}