dmTransport.c 14.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
18 19
#include "qworker.h"

20 21
#define INTERNAL_USER   "_dnd"
#define INTERNAL_CKEY   "_key"
S
Shengliang 已提交
22
#define INTERNAL_SECRET "_pwd"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
25 26 27 28
  SDnodeData *pData = &pDnode->data;
  taosRLockLatch(&pData->latch);
  *pEpSet = pData->mnodeEps;
  taosRUnLockLatch(&pData->latch);
S
Shengliang Guan 已提交
29 30
}

S
Shengliang Guan 已提交
31
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
32
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
33
  SDnodeData *pData = &pDnode->data;
S
Shengliang Guan 已提交
34

S
Shengliang Guan 已提交
35 36
  taosWLockLatch(&pData->latch);
  pData->mnodeEps = *pEpSet;
S
Shengliang Guan 已提交
37 38 39 40
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
  }

S
Shengliang Guan 已提交
41
  taosWUnLockLatch(&pData->latch);
S
Shengliang Guan 已提交
42
}
43

S
Shengliang Guan 已提交
44
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
45
  SRpcConnInfo connInfo = {0};
S
Shengliang Guan 已提交
46
  if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
S
Shengliang Guan 已提交
47
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
S
Shengliang Guan 已提交
48
    dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
S
Shengliang Guan 已提交
49 50 51
    return -1;
  }

S
Shengliang Guan 已提交
52 53 54 55
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  memcpy(pMsg->conn.user, connInfo.user, TSDB_USER_LEN);
  pMsg->conn.clientIp = connInfo.clientIp;
  pMsg->conn.clientPort = connInfo.clientPort;
S
Shengliang Guan 已提交
56 57 58
  return 0;
}

S
Shengliang Guan 已提交
59 60
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
61 62 63 64
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
  }
S
Shengliang Guan 已提交
65

S
Shengliang Guan 已提交
66
  dTrace("msg:%p, will be processed by %s, handle:%p", pMsg, pWrapper->name, pMsg->info.handle);
S
Shengliang Guan 已提交
67 68 69 70 71 72
  return (*msgFp)(pWrapper->pMgmt, pMsg);
}

static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
  SDnodeTrans  *pTrans = &pDnode->trans;
  int32_t       code = -1;
S
Shengliang Guan 已提交
73
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
74
  bool          needRelease = false;
S
Shengliang Guan 已提交
75
  SMsgHandle   *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
76 77
  SMgmtWrapper *pWrapper = NULL;

S
Shengliang Guan 已提交
78 79
  dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
         pRpc->info.handle, pRpc->pCont, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
80

S
Shengliang Guan 已提交
81
  if (pRpc->msgType == TDMT_DND_NET_TEST) {
S
Shengliang Guan 已提交
82
    dmProcessNetTestReq(pDnode, pRpc);
83
    return;
S
Shengliang Guan 已提交
84
  } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
S
Shengliang Guan 已提交
85 86
    code = qWorkerProcessFetchRsp(NULL, NULL, pRpc);
    pRpc->pCont = NULL;  // will be freed in qworker
87
    return;
S
Shengliang Guan 已提交
88 89 90
  } else {
  }

S
Shengliang Guan 已提交
91
  if (pDnode->status != DND_STAT_RUNNING) {
S
Shengliang Guan 已提交
92
    if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
S
Shengliang Guan 已提交
93 94
      dmProcessServerStartupStatus(pDnode, pRpc);
    } else {
95
      SRpcMsg rspMsg = {
S
Shengliang Guan 已提交
96
          .info.handle = pRpc->info.handle,
97
          .code = TSDB_CODE_APP_NOT_READY,
S
Shengliang Guan 已提交
98 99
          .info.ahandle = pRpc->info.ahandle,
          .info.refId = pRpc->info.refId,
100 101
      };
      rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
102
    }
103
    return;
S
Shengliang Guan 已提交
104 105
  }

S
Shengliang Guan 已提交
106
  if (IsReq(pRpc) && pRpc->pCont == NULL) {
S
Shengliang Guan 已提交
107 108 109 110 111 112 113 114 115 116
    terrno = TSDB_CODE_INVALID_MSG_LEN;
    goto _OVER;
  }

  if (pHandle->defaultNtype == NODE_END) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    goto _OVER;
  } else {
    pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
    if (pHandle->needCheckVgId) {
117 118 119 120 121 122 123 124 125 126 127
      if (pRpc->contLen > 0) {
        SMsgHead *pHead = pRpc->pCont;
        int32_t   vgId = ntohl(pHead->vgId);
        if (vgId == QNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[QNODE];
        } else if (vgId == MNODE_HANDLE) {
          pWrapper = &pDnode->wrappers[MNODE];
        } else {
          terrno = TSDB_CODE_INVALID_MSG;
          goto _OVER;
        }
S
Shengliang Guan 已提交
128
      } else {
129 130
        terrno = TSDB_CODE_INVALID_MSG_LEN;
        goto _OVER;
S
Shengliang Guan 已提交
131 132 133
      }
    }
  }
S
Shengliang Guan 已提交
134

S
Shengliang Guan 已提交
135 136 137 138 139
  if (dmMarkWrapper(pWrapper) != 0) {
    goto _OVER;
  } else {
    needRelease = true;
  }
S
Shengliang Guan 已提交
140

S
Shengliang Guan 已提交
141
  pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
142 143 144
  if (pMsg == NULL) {
    goto _OVER;
  }
S
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146 147 148 149 150
  if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
    goto _OVER;
  }

  if (InParentProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
151 152 153
    code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
                             (IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
                             DND_FUNC_REQ);
S
Shengliang Guan 已提交
154 155 156
  } else {
    code = dmProcessNodeMsg(pWrapper, pMsg);
  }
S
Shengliang Guan 已提交
157 158 159

_OVER:
  if (code == 0) {
S
Shengliang Guan 已提交
160
    if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
161
      dTrace("msg:%p, is freed after push to cqueue", pMsg);
S
Shengliang Guan 已提交
162 163 164 165
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
Shengliang Guan 已提交
166 167 168
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
    if (terrno != 0) code = terrno;

S
Shengliang Guan 已提交
169
    if (IsReq(pRpc)) {
170
      if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
S
Shengliang Guan 已提交
171
        if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) {
172 173 174
          code = TSDB_CODE_NODE_REDIRECT;
        }
      }
S
Shengliang Guan 已提交
175
      SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
S
Shengliang Guan 已提交
176
      tmsgSendRsp(&rspMsg);
S
Shengliang Guan 已提交
177
    }
S
Shengliang Guan 已提交
178

S
Shengliang Guan 已提交
179 180 181 182 183
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

184 185 186
  if (needRelease) {
    dmReleaseWrapper(pWrapper);
  }
S
Shengliang Guan 已提交
187 188
}

S
Shengliang Guan 已提交
189
int32_t dmInitMsgHandle(SDnode *pDnode) {
S
Shengliang Guan 已提交
190
  SDnodeTrans *pTrans = &pDnode->trans;
S
shm  
Shengliang Guan 已提交
191

S
Shengliang 已提交
192 193
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
Shengliang Guan 已提交
194 195 196 197 198 199 200 201 202 203
    SArray       *pArray = (*pWrapper->func.getHandlesFp)();
    if (pArray == NULL) return -1;

    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
      SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
      SMsgHandle  *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
      if (pMgmt->needCheckVgId) {
        pHandle->needCheckVgId = pMgmt->needCheckVgId;
      }
      if (!pMgmt->needCheckVgId) {
S
Shengliang 已提交
204
        pHandle->defaultNtype = ntype;
S
shm  
Shengliang Guan 已提交
205
      }
S
Shengliang Guan 已提交
206
      pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
S
shm  
Shengliang Guan 已提交
207
    }
S
Shengliang Guan 已提交
208 209

    taosArrayDestroy(pArray);
S
shm  
Shengliang Guan 已提交
210 211 212 213 214
  }

  return 0;
}

S
Shengliang Guan 已提交
215
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
S
Shengliang Guan 已提交
216
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
217
  dmGetMnodeEpSet(pDnode, &epSet);
S
Shengliang Guan 已提交
218

S
Shengliang Guan 已提交
219
  dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
220 221
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
222
    if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
S
Shengliang Guan 已提交
223 224 225 226 227
      epSet.inUse = (i + 1) % epSet.numOfEps;
    }

    epSet.eps[i].port = htons(epSet.eps[i].port);
  }
S
Shengliang Guan 已提交
228

dengyihao's avatar
dengyihao 已提交
229 230
  SMEpSet msg = {.epSet = epSet};
  int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
S
Shengliang Guan 已提交
231 232 233

  SRpcMsg rsp = {
      .code = TSDB_CODE_RPC_REDIRECT,
S
Shengliang Guan 已提交
234
      .info = pReq->info,
S
Shengliang Guan 已提交
235 236 237 238 239
      .contLen = len,
  };
  rsp.pCont = rpcMallocCont(len);
  tSerializeSMEpSet(rsp.pCont, len, &msg);
  rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
240 241
}

S
Shengliang Guan 已提交
242
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
243
  if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
S
Shengliang Guan 已提交
244
    dmSendRpcRedirectRsp(pDnode, pRsp);
245 246
  } else {
    rpcSendResponse(pRsp);
S
shm  
Shengliang Guan 已提交
247 248 249
  }
}

S
Shengliang Guan 已提交
250
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
251 252
  if (pDnode->status != DND_STAT_RUNNING) {
    pRsp->code = TSDB_CODE_NODE_OFFLINE;
253 254
    rpcFreeCont(pReq->pCont);
    pReq->pCont = NULL;
S
Shengliang Guan 已提交
255 256 257
  } else {
    rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
  }
S
Shengliang Guan 已提交
258 259
}

S
Shengliang Guan 已提交
260
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
261 262 263
  SEpSet epSet = {0};
  dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
  dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
S
Shengliang Guan 已提交
264 265
}

S
Shengliang Guan 已提交
266
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
267
  SDnode *pDnode = pWrapper->pDnode;
268 269 270
  if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) {
    rpcFreeCont(pReq->pCont);
    pReq->pCont = NULL;
271
    terrno = TSDB_CODE_NODE_OFFLINE;
S
Shengliang Guan 已提交
272
    dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle);
S
shm  
Shengliang Guan 已提交
273 274 275
    return -1;
  }

276 277
  rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
  return 0;
S
shm  
Shengliang Guan 已提交
278 279
}

S
Shengliang Guan 已提交
280 281
static inline void dmSendRsp(const SRpcMsg *pRsp) {
  SMgmtWrapper *pWrapper = pRsp->info.wrapper;
282
  if (InChildProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
283
    dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
284 285
  } else {
    dmSendRpcRsp(pWrapper->pDnode, pRsp);
S
shm  
Shengliang Guan 已提交
286
  }
D
dapan1121 已提交
287
}
S
Shengliang Guan 已提交
288

M
Minghao Li 已提交
289
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
290 291 292
  if (InChildProc(pWrapper->proc.ptype)) {
    dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
  } else {
S
Shengliang Guan 已提交
293
    SRpcMsg rsp = {0};
M
Minghao Li 已提交
294 295
    SMEpSet msg = {.epSet = *pNewEpSet};
    int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
S
Shengliang Guan 已提交
296 297 298 299 300
    rsp.pCont = rpcMallocCont(len);
    rsp.contLen = len;
    tSerializeSMEpSet(rsp.pCont, len, &msg);

    rsp.code = TSDB_CODE_RPC_REDIRECT;
S
Shengliang Guan 已提交
301
    rsp.info = pRsp->info;
S
Shengliang Guan 已提交
302
    rpcSendResponse(&rsp);
M
Minghao Li 已提交
303 304 305
  }
}

S
Shengliang Guan 已提交
306
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
307
  if (InChildProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
308
    dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
309 310
  } else {
    rpcRegisterBrokenLinkArg(pMsg);
S
shm  
Shengliang Guan 已提交
311 312 313
  }
}

S
Shengliang Guan 已提交
314
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
315
  if (InChildProc(pWrapper->proc.ptype)) {
S
Shengliang Guan 已提交
316
    SRpcMsg msg = {.info.handle = handle, .code = type};
S
Shengliang Guan 已提交
317
    dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
318 319
  } else {
    rpcReleaseHandle(handle, type);
S
Shengliang Guan 已提交
320
  }
S
shm  
Shengliang Guan 已提交
321 322
}

323
static bool rpcRfp(int32_t code) {
M
Minghao Li 已提交
324 325 326 327 328 329 330
  if (code == TSDB_CODE_RPC_REDIRECT) {
    return true;
  } else {
    return false;
  }
}

331
int32_t dmInitClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
332 333 334 335 336
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
  rpcInit.label = "DND";
  rpcInit.numOfThreads = 1;
S
Shengliang Guan 已提交
337
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
338 339 340 341 342 343 344
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = INTERNAL_USER;
  rpcInit.ckey = INTERNAL_CKEY;
  rpcInit.spi = 1;
  rpcInit.parent = pDnode;
M
Minghao Li 已提交
345
  rpcInit.rfp = rpcRfp;
S
Shengliang Guan 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360

  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  rpcInit.secret = pass;

  pTrans->clientRpc = rpcOpen(&rpcInit);
  if (pTrans->clientRpc == NULL) {
    dError("failed to init dnode rpc client");
    return -1;
  }

  dDebug("dnode rpc client is initialized");
  return 0;
}

361
void dmCleanupClient(SDnode *pDnode) {
S
Shengliang Guan 已提交
362 363 364 365 366 367 368 369
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->clientRpc) {
    rpcClose(pTrans->clientRpc);
    pTrans->clientRpc = NULL;
    dDebug("dnode rpc client is closed");
  }
}

S
Shengliang Guan 已提交
370 371
static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                        char *ckey) {
S
Shengliang Guan 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
  int32_t code = 0;
  char    pass[TSDB_PASSWORD_LEN + 1] = {0};

  if (strcmp(user, INTERNAL_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
  } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
    taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
  } else {
    code = -1;
  }

  if (code == 0) {
    memcpy(secret, pass, TSDB_PASSWORD_LEN);
    *spi = 1;
    *encrypt = 0;
    *ckey = 0;
  }

  return code;
}

S
Shengliang Guan 已提交
393 394 395
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
                                             char *ckey) {
  if (dmGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
S
Shengliang Guan 已提交
396 397 398 399 400 401 402
    dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
    return 0;
  }

  SAuthReq authReq = {0};
  tstrncpy(authReq.user, user, TSDB_USER_LEN);
  int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
403
  void   *pReq = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
404 405
  tSerializeSAuthReq(pReq, contLen, &authReq);

S
Shengliang Guan 已提交
406
  SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .info.ahandle = (void *)9528};
S
Shengliang Guan 已提交
407
  SRpcMsg rpcRsp = {0};
S
Shengliang Guan 已提交
408
  SEpSet  epSet = {0};
S
Shengliang Guan 已提交
409
  dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
S
Shengliang Guan 已提交
410 411
  dmGetMnodeEpSet(pDnode, &epSet);
  dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp);
S
Shengliang Guan 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430

  if (rpcRsp.code != 0) {
    terrno = rpcRsp.code;
    dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
  } else {
    SAuthRsp authRsp = {0};
    tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
    memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
    memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
    *spi = authRsp.spi;
    *encrypt = authRsp.encrypt;
    dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
           authRsp.encrypt);
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
}

431
int32_t dmInitServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
432 433 434
  SDnodeTrans *pTrans = &pDnode->trans;

  SRpcInit rpcInit = {0};
435

S
Shengliang Guan 已提交
436 437
  strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn));
  rpcInit.localPort = pDnode->data.serverPort;
S
Shengliang Guan 已提交
438 439
  rpcInit.label = "DND";
  rpcInit.numOfThreads = tsNumOfRpcThreads;
S
Shengliang Guan 已提交
440
  rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
S
Shengliang Guan 已提交
441 442 443
  rpcInit.sessions = tsMaxShellConns;
  rpcInit.connType = TAOS_CONN_SERVER;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
S
Shengliang Guan 已提交
444
  rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo;
S
Shengliang Guan 已提交
445 446 447 448 449 450 451 452 453 454 455 456
  rpcInit.parent = pDnode;

  pTrans->serverRpc = rpcOpen(&rpcInit);
  if (pTrans->serverRpc == NULL) {
    dError("failed to init dnode rpc server");
    return -1;
  }

  dDebug("dnode rpc server is initialized");
  return 0;
}

457
void dmCleanupServer(SDnode *pDnode) {
S
Shengliang Guan 已提交
458 459 460 461 462 463 464 465
  SDnodeTrans *pTrans = &pDnode->trans;
  if (pTrans->serverRpc) {
    rpcClose(pTrans->serverRpc);
    pTrans->serverRpc = NULL;
    dDebug("dnode rpc server is closed");
  }
}

S
Shengliang Guan 已提交
466 467
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
  SMsgCb msgCb = {
S
Shengliang Guan 已提交
468 469
      .pWrapper = pWrapper,
      .clientRpc = pWrapper->pDnode->trans.clientRpc,
S
Shengliang Guan 已提交
470 471
      .sendReqFp = dmSendReq,
      .sendRspFp = dmSendRsp,
S
Shengliang Guan 已提交
472
      .sendMnodeRecvFp = dmSendToMnodeRecv,
M
Minghao Li 已提交
473
      .sendRedirectRspFp = dmSendRedirectRsp,
S
Shengliang Guan 已提交
474 475
      .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
      .releaseHandleFp = dmReleaseHandle,
S
Shengliang Guan 已提交
476
      .reportStartupFp = dmReportStartupByWrapper,
S
Shengliang Guan 已提交
477 478
  };
  return msgCb;
dengyihao's avatar
dengyihao 已提交
479
}