dnodeTransport.c 14.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/* this file is mainly responsible for the communication between DNODEs. Each 
17
 * dnode works as both server and client. Dnode may send status, grant, config
18 19 20 21 22
 * messages to mnode, mnode may send create/alter/drop table/vnode messages 
 * to dnode. All theses messages are handled from here
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
23 24 25 26
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
S
Shengliang Guan 已提交
27

28
static struct {
S
Shengliang Guan 已提交
29
  void *peerRpc;
S
Shengliang Guan 已提交
30
  void *shellRpc;
S
Shengliang Guan 已提交
31
  void *clientRpc;
S
Shengliang Guan 已提交
32
  MsgFp msgFp[TSDB_MSG_TYPE_MAX];
33 34
} tsTrans;

S
Shengliang Guan 已提交
35 36
static void dnodeInitMsgFp() {
  // msg from client to dnode
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
  tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg;
  
S
Shengliang Guan 已提交
53
  // msg from client to mnode
S
Shengliang Guan 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
  tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeReadMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
86

S
Shengliang Guan 已提交
87 88 89 90 91
  // message from client to dnode
  tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;

  // message from mnode to vnode
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg;
S
Shengliang Guan 已提交
92
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
93
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg;
S
Shengliang Guan 已提交
94
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
95
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg;
S
Shengliang Guan 已提交
96
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
97 98 99

  // message from mnode to dnode
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
100
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
101
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
102
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
103
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
104
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
105
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
106
  tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
107
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
108
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
109
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
S
Shengliang Guan 已提交
110 111 112 113 114 115 116
  tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMgmtMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN] = dnodeProcessMnodeMgmtMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMgmtMsg;
  tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
117
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
118
  tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
119 120

  // message from dnode to mnode
S
Shengliang Guan 已提交
121
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeReadMsg;
S
Shengliang Guan 已提交
122
  tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
123
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
124
  tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
125
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeWriteMsg;
S
Shengliang Guan 已提交
126
  tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
S
Shengliang Guan 已提交
127 128
}

S
Shengliang Guan 已提交
129
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
130
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
131
  int32_t msgType = pMsg->msgType;
132

S
Shengliang Guan 已提交
133
  if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
S
Shengliang Guan 已提交
134
    dnodeProcessDnodeMsg(pMsg, pEpSet);
135 136 137
    return;
  }

S
Shengliang Guan 已提交
138
  if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
139 140 141
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
142
    dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
143 144 145 146 147 148 149 150 151
    return;
  }

  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
152
  MsgFp fp = tsTrans.msgFp[msgType];
153
  if (fp != NULL) {
S
Shengliang Guan 已提交
154
    dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
155
    (*fp)(pMsg, pEpSet);
156
  } else {
S
Shengliang Guan 已提交
157
    dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
158 159 160 161 162 163
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
164
static int32_t dnodeInitPeerServer() {
165 166 167 168 169
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = tsDnodeDnodePort;
  rpcInit.label = "DND-S";
  rpcInit.numOfThreads = 1;
170
  rpcInit.cfp = dnodeProcessPeerReq;
171 172 173 174
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;

S
Shengliang Guan 已提交
175 176
  tsTrans.peerRpc = rpcOpen(&rpcInit);
  if (tsTrans.peerRpc == NULL) {
177 178 179 180 181 182 183 184
    dError("failed to init peer rpc server");
    return -1;
  }

  dInfo("dnode peer rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
185 186 187 188
static void dnodeCleanupPeerServer() {
  if (tsTrans.peerRpc) {
    rpcClose(tsTrans.peerRpc);
    tsTrans.peerRpc = NULL;
189 190 191 192
    dInfo("dnode peer server is closed");
  }
}

S
Shengliang Guan 已提交
193
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
194 195 196
  int32_t msgType = pMsg->msgType;

  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
197
    if (pMsg == NULL || pMsg->pCont == NULL) return;
S
Shengliang Guan 已提交
198
    dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
199 200 201 202
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
203
  MsgFp fp = tsTrans.msgFp[msgType];
204
  if (fp != NULL) {
S
Shengliang Guan 已提交
205 206
    dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
    (*fp)(pMsg, pEpSet);
207
  } else {
S
Shengliang Guan 已提交
208
    dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
209 210 211 212 213
  }

  rpcFreeCont(pMsg->pCont);
}

S
Shengliang Guan 已提交
214
static int32_t dnodeInitClient() {
S
Shengliang Guan 已提交
215 216
  char secret[TSDB_KEY_LEN] = "secret";

217 218
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
219
  rpcInit.label = "DND-C";
220
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
221 222 223 224 225 226 227
  rpcInit.cfp = dnodeProcessPeerRsp;
  rpcInit.sessions = TSDB_MAX_VNODES << 4;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "t";
  rpcInit.ckey = "key";
  rpcInit.secret = secret;
228

229 230
  tsTrans.clientRpc = rpcOpen(&rpcInit);
  if (tsTrans.clientRpc == NULL) {
231 232 233 234 235 236 237 238
    dError("failed to init peer rpc client");
    return -1;
  }

  dInfo("dnode peer rpc client is initialized");
  return 0;
}

S
Shengliang Guan 已提交
239
static void dnodeCleanupClient() {
240 241 242
  if (tsTrans.clientRpc) {
    rpcClose(tsTrans.clientRpc);
    tsTrans.clientRpc = NULL;
243 244 245 246
    dInfo("dnode peer rpc client is closed");
  }
}

S
Shengliang Guan 已提交
247
static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
248
  SRpcMsg rspMsg = {.handle = pMsg->handle};
S
Shengliang Guan 已提交
249
  int32_t msgType = pMsg->msgType;
250

S
Shengliang Guan 已提交
251 252 253 254
  if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
    dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_EXITING;
    rpcSendResponse(&rspMsg);
255 256
    rpcFreeCont(pMsg->pCont);
    return;
S
Shengliang Guan 已提交
257 258 259 260
  } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
    dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_APP_NOT_READY;
    rpcSendResponse(&rspMsg);
261 262 263 264
    rpcFreeCont(pMsg->pCont);
    return;
  }

S
Shengliang Guan 已提交
265 266 267 268 269 270
  if (pMsg->pCont == NULL) {
    rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
    rpcSendResponse(&rspMsg);
    return;
  }

S
Shengliang Guan 已提交
271
  MsgFp fp = tsTrans.msgFp[msgType];
272
  if (fp != NULL) {
S
Shengliang Guan 已提交
273
    dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
S
Shengliang Guan 已提交
274
    (*fp)(pMsg, pEpSet);
275
  } else {
S
Shengliang Guan 已提交
276 277 278
    dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
    rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
    rpcSendResponse(&rspMsg);
279 280 281 282
    rpcFreeCont(pMsg->pCont);
  }
}

S
Shengliang Guan 已提交
283
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
S
Shengliang Guan 已提交
284
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
285
  dnodeGetMnodeEpSetForPeer(&epSet);
286
  rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
287 288
}

289
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
290
  int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey);
291 292 293 294 295 296
  if (code != TSDB_CODE_APP_NOT_READY) return code;

  SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));

  dDebug("user:%s, send auth msg to mnodes", user);
S
Shengliang Guan 已提交
297
  SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
298
  SRpcMsg rpcRsp = {0};
299
  dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
300 301 302 303 304

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    dDebug("user:%s, auth msg received from mnodes", user);
S
Shengliang Guan 已提交
305
    SAuthRsp *pRsp = rpcRsp.pCont;
306 307 308 309 310 311 312 313 314 315
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

S
Shengliang Guan 已提交
316
static int32_t dnodeInitShellServer() {
317 318 319 320 321 322 323
  int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }

  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
S
Shengliang Guan 已提交
324 325
  rpcInit.localPort = tsDnodeShellPort;
  rpcInit.label = "SHELL";
326
  rpcInit.numOfThreads = numOfThreads;
S
Shengliang Guan 已提交
327 328 329 330 331
  rpcInit.cfp = dnodeProcessShellReq;
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.afp = dnodeRetrieveUserAuthInfo;
332

333 334
  tsTrans.shellRpc = rpcOpen(&rpcInit);
  if (tsTrans.shellRpc == NULL) {
335 336 337 338 339 340 341 342
    dError("failed to init shell rpc server");
    return -1;
  }

  dInfo("dnode shell rpc server is initialized");
  return 0;
}

S
Shengliang Guan 已提交
343
static void dnodeCleanupShellServer() {
344 345 346
  if (tsTrans.shellRpc) {
    rpcClose(tsTrans.shellRpc);
    tsTrans.shellRpc = NULL;
347 348 349
  }
}

350 351
int32_t dnodeInitTrans() {
  if (dnodeInitClient() != 0) {
352 353 354
    return -1;
  }

S
Shengliang Guan 已提交
355
  if (dnodeInitPeerServer() != 0) {
356 357 358
    return -1;
  }

S
Shengliang Guan 已提交
359
  if (dnodeInitShellServer() != 0) {
360 361 362 363 364 365
    return -1;
  }

  return 0;
}

366
void dnodeCleanupTrans() {
S
Shengliang Guan 已提交
367 368
  dnodeCleanupShellServer();
  dnodeCleanupPeerServer();
369
  dnodeCleanupClient();
370
}
S
Shengliang Guan 已提交
371

S
Shengliang Guan 已提交
372
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
S
Shengliang Guan 已提交
373 374

void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
S
Shengliang Guan 已提交
375
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
376 377 378
  dnodeGetMnodeEpSetForPeer(&epSet);
  dnodeSendMsgToDnode(&epSet, rpcMsg);
}